Skip to content

Commit c983c04

Browse files
egusevartembilan
authored andcommitted
GH-1014: Add addMdcAsHeaders into appenders
Fixes #1014 **Cherry-pick to 2.1.x, 2.0.x & 1.7.x** GH-1014 minor changes GH-1014 renamed property to addMdcAsHeaders GH-1014 added addMdcAsHeaders into documentation GH-1014 added addMdcAsHeaders into logback appender. added integration test GH-1014 updated documentation GH-1014 minor fix GH-1014 updated documentation GH-1014 minor fix GH-1014 removed this prefix * Made addMdcAsHeaders true by default * Polishing # Conflicts: # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/log4j2/AmqpAppenderTests.java # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/logback/AmqpAppenderIntegrationTests.java
1 parent 559a320 commit c983c04

File tree

8 files changed

+192
-56
lines changed

8 files changed

+192
-56
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/log4j2/AmqpAppender.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
* @author Artem Bilan
8787
* @author Dominique Villard
8888
* @author Nicolas Ristock
89+
* @author Eugene Gusev
8990
*
9091
* @since 1.6
9192
*/
@@ -183,7 +184,9 @@ public static AmqpAppender createAppender(// NOSONAR NCSS line count
183184
@PluginAttribute("async") boolean async,
184185
@PluginAttribute("charset") String charset,
185186
@PluginAttribute(value = "bufferSize", defaultInt = Integer.MAX_VALUE) int bufferSize,
186-
@PluginElement(BlockingQueueFactory.ELEMENT_TYPE) BlockingQueueFactory<Event> blockingQueueFactory) {
187+
@PluginElement(BlockingQueueFactory.ELEMENT_TYPE) BlockingQueueFactory<Event> blockingQueueFactory,
188+
@PluginAttribute(value = "addMdcAsHeaders", defaultBoolean = true) boolean addMdcAsHeaders) {
189+
187190
if (name == null) {
188191
LOGGER.error("No name for AmqpAppender");
189192
}
@@ -226,6 +229,7 @@ public static AmqpAppender createAppender(// NOSONAR NCSS line count
226229
manager.clientConnectionProperties = clientConnectionProperties;
227230
manager.charset = charset;
228231
manager.async = async;
232+
manager.addMdcAsHeaders = addMdcAsHeaders;
229233

230234
BlockingQueue<Event> eventQueue;
231235
if (blockingQueueFactory == null) {
@@ -283,7 +287,7 @@ protected Message postProcessMessageBeforeSend(Message message, Event event) {
283287
return message;
284288
}
285289

286-
private void sendEvent(Event event, Map<?, ?> properties) {
290+
protected void sendEvent(Event event, Map<?, ?> properties) {
287291
LogEvent logEvent = event.getEvent();
288292
String name = logEvent.getLoggerName();
289293
Level level = logEvent.getLevel();
@@ -312,8 +316,10 @@ private void sendEvent(Event event, Map<?, ?> properties) {
312316
amqpProps.setTimestamp(tstamp.getTime());
313317

314318
// Copy properties in from MDC
315-
for (Entry<?, ?> entry : properties.entrySet()) {
316-
amqpProps.setHeader(entry.getKey().toString(), entry.getValue());
319+
if (this.manager.addMdcAsHeaders) {
320+
for (Entry<?, ?> entry : properties.entrySet()) {
321+
amqpProps.setHeader(entry.getKey().toString(), entry.getValue());
322+
}
317323
}
318324
if (logEvent.getSource() != null) {
319325
amqpProps.setHeader(
@@ -326,7 +332,7 @@ private void sendEvent(Event event, Map<?, ?> properties) {
326332
doSend(event, logEvent, amqpProps);
327333
}
328334

329-
private void doSend(Event event, LogEvent logEvent, MessageProperties amqpProps) {
335+
protected void doSend(Event event, LogEvent logEvent, MessageProperties amqpProps) {
330336
StringBuilder msgBody;
331337
String routingKey;
332338
try {
@@ -603,6 +609,11 @@ protected static class AmqpManager extends AbstractManager {
603609
*/
604610
private String charset = Charset.defaultCharset().name();
605611

612+
/**
613+
* Whether or not add MDC properties into message headers. true by default for backward compatibility
614+
*/
615+
private boolean addMdcAsHeaders = true;
616+
606617
private boolean durable = true;
607618

608619
private MessageDeliveryMode deliveryMode = MessageDeliveryMode.PERSISTENT;

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/logback/AmqpAppender.java

Lines changed: 67 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
* @author Stephen Oakey
9191
* @author Dominique Villard
9292
* @author Nicolas Ristock
93+
* @author Eugene Gusev
9394
*
9495
* @since 1.4
9596
*/
@@ -296,6 +297,11 @@ public class AmqpAppender extends AppenderBase<ILoggingEvent> {
296297
*/
297298
private String charset;
298299

300+
/**
301+
* Whether or not add MDC properties into message headers. true by default for backward compatibility
302+
*/
303+
private boolean addMdcAsHeaders = true;
304+
299305
private boolean durable = true;
300306

301307
private MessageDeliveryMode deliveryMode = MessageDeliveryMode.PERSISTENT;
@@ -534,6 +540,14 @@ public void setMaxSenderRetries(int maxSenderRetries) {
534540
this.maxSenderRetries = maxSenderRetries;
535541
}
536542

543+
public boolean isAddMdcAsHeaders() {
544+
return this.addMdcAsHeaders;
545+
}
546+
547+
public void setAddMdcAsHeaders(boolean addMdcAsHeaders) {
548+
this.addMdcAsHeaders = addMdcAsHeaders;
549+
}
550+
537551
public boolean isDurable() {
538552
return this.durable;
539553
}
@@ -788,6 +802,54 @@ else if ("headers".equals(this.exchangeType)) {
788802
}
789803
}
790804

805+
protected MessageProperties prepareMessageProperties(Event event) {
806+
ILoggingEvent logEvent = event.getEvent();
807+
808+
String name = logEvent.getLoggerName();
809+
Level level = logEvent.getLevel();
810+
811+
MessageProperties amqpProps = new MessageProperties();
812+
amqpProps.setDeliveryMode(this.deliveryMode);
813+
amqpProps.setContentType(this.contentType);
814+
if (null != this.contentEncoding) {
815+
amqpProps.setContentEncoding(this.contentEncoding);
816+
}
817+
amqpProps.setHeader(CATEGORY_NAME, name);
818+
amqpProps.setHeader(THREAD_NAME, logEvent.getThreadName());
819+
amqpProps.setHeader(CATEGORY_LEVEL, level.toString());
820+
if (this.generateId) {
821+
amqpProps.setMessageId(UUID.randomUUID().toString());
822+
}
823+
824+
// Set timestamp
825+
Calendar tstamp = Calendar.getInstance();
826+
tstamp.setTimeInMillis(logEvent.getTimeStamp());
827+
amqpProps.setTimestamp(tstamp.getTime());
828+
829+
// Copy properties in from MDC
830+
if (this.addMdcAsHeaders) {
831+
Map<String, String> props = event.getProperties();
832+
Set<Entry<String, String>> entrySet = props.entrySet();
833+
for (Entry<String, String> entry : entrySet) {
834+
amqpProps.setHeader(entry.getKey(), entry.getValue());
835+
}
836+
}
837+
838+
String[] location = this.locationLayout.doLayout(logEvent).split("\\|");
839+
if (!"?".equals(location[0])) {
840+
amqpProps.setHeader(
841+
"location",
842+
String.format("%s.%s()[%s]", location[0], location[1], location[2]));
843+
}
844+
845+
// Set applicationId, if we're using one
846+
if (this.applicationId != null) {
847+
amqpProps.setAppId(this.applicationId);
848+
}
849+
850+
return amqpProps;
851+
}
852+
791853
/**
792854
* Subclasses may modify the final message before sending.
793855
* @param message The message.
@@ -810,50 +872,14 @@ public void run() {
810872
RabbitTemplate rabbitTemplate = new RabbitTemplate(AmqpAppender.this.connectionFactory);
811873
while (true) {
812874
final Event event = AmqpAppender.this.events.take();
813-
ILoggingEvent logEvent = event.getEvent();
814-
815-
String name = logEvent.getLoggerName();
816-
Level level = logEvent.getLevel();
817-
818-
MessageProperties amqpProps = new MessageProperties();
819-
amqpProps.setDeliveryMode(AmqpAppender.this.deliveryMode);
820-
amqpProps.setContentType(AmqpAppender.this.contentType);
821-
if (null != AmqpAppender.this.contentEncoding) {
822-
amqpProps.setContentEncoding(AmqpAppender.this.contentEncoding);
823-
}
824-
amqpProps.setHeader(CATEGORY_NAME, name);
825-
amqpProps.setHeader(THREAD_NAME, logEvent.getThreadName());
826-
amqpProps.setHeader(CATEGORY_LEVEL, level.toString());
827-
if (AmqpAppender.this.generateId) {
828-
amqpProps.setMessageId(UUID.randomUUID().toString());
829-
}
830-
831-
// Set timestamp
832-
Calendar tstamp = Calendar.getInstance();
833-
tstamp.setTimeInMillis(logEvent.getTimeStamp());
834-
amqpProps.setTimestamp(tstamp.getTime());
835-
836-
// Copy properties in from MDC
837-
Map<String, String> props = event.getProperties();
838-
Set<Entry<String, String>> entrySet = props.entrySet();
839-
for (Entry<String, String> entry : entrySet) {
840-
amqpProps.setHeader(entry.getKey(), entry.getValue());
841-
}
842-
String[] location = AmqpAppender.this.locationLayout.doLayout(logEvent).split("\\|");
843-
if (!"?".equals(location[0])) {
844-
amqpProps.setHeader(
845-
"location",
846-
String.format("%s.%s()[%s]", location[0], location[1], location[2]));
847-
}
848-
String routingKey = AmqpAppender.this.routingKeyLayout.doLayout(logEvent);
849-
// Set applicationId, if we're using one
850-
if (AmqpAppender.this.applicationId != null) {
851-
amqpProps.setAppId(AmqpAppender.this.applicationId);
852-
}
875+
876+
MessageProperties amqpProps = prepareMessageProperties(event);
877+
878+
String routingKey = AmqpAppender.this.routingKeyLayout.doLayout(event.getEvent());
853879

854880
sendOneEncoderPatternMessage(rabbitTemplate, routingKey);
855881

856-
doSend(rabbitTemplate, event, logEvent, name, amqpProps, routingKey);
882+
doSend(rabbitTemplate, event, event.getEvent(), name, amqpProps, routingKey);
857883
}
858884
}
859885
catch (InterruptedException e) {

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/log4j2/AmqpAppenderTests.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
* @author Artem Bilan
6868
* @author Dominique Villard
6969
* @author Nicolas Ristock
70+
* @author Eugene Gusev
7071
*
7172
* @since 1.6
7273
*/
@@ -162,6 +163,8 @@ public void testProperties() {
162163
assertEquals(5, TestUtils.getPropertyValue(manager, "maxSenderRetries"));
163164
// change the property to true and this fails and test() randomly fails too.
164165
assertFalse(TestUtils.getPropertyValue(manager, "async", Boolean.class));
166+
// default value
167+
assertTrue(TestUtils.getPropertyValue(manager, "addMdcAsHeaders", Boolean.class));
165168

166169
assertEquals(10, TestUtils.getPropertyValue(appender, "events.items", Object[].class).length);
167170

@@ -176,7 +179,11 @@ public void testAmqpAppenderEventQueueTypeDefaultsToLinkedBlockingQueue() throws
176179
Map.class).get("rabbitmq_default_queue");
177180

178181
Object events = TestUtils.getPropertyValue(appender, "events");
179-
assertEquals(LinkedBlockingQueue.class, events.getClass());
182+
183+
Object manager = TestUtils.getPropertyValue(appender, "manager");
184+
assertTrue(TestUtils.getPropertyValue(manager, "addMdcAsHeaders", Boolean.class));
185+
186+
assertThat(events, instanceOf(LinkedBlockingQueue.class));
180187
}
181188

182189
@Test
@@ -192,6 +199,7 @@ public void testUriProperties() {
192199
assertNull(TestUtils.getPropertyValue(manager, "username"));
193200
assertNull(TestUtils.getPropertyValue(manager, "password"));
194201
assertNull(TestUtils.getPropertyValue(manager, "virtualHost"));
202+
assertFalse(TestUtils.getPropertyValue(manager, "addMdcAsHeaders", Boolean.class));
195203
}
196204

197205
@Test

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/logback/AmqpAppenderIntegrationTests.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static org.hamcrest.Matchers.containsString;
2020
import static org.hamcrest.Matchers.equalTo;
21+
import static org.hamcrest.Matchers.hasEntry;
2122
import static org.hamcrest.Matchers.instanceOf;
2223
import static org.hamcrest.Matchers.is;
2324
import static org.hamcrest.Matchers.not;
@@ -61,6 +62,7 @@
6162
/**
6263
* @author Artem Bilan
6364
* @author Nicolas Ristock
65+
* @author Eugene Gusev
6466
*
6567
* @since 1.4
6668
*/
@@ -84,15 +86,20 @@ public class AmqpAppenderIntegrationTests {
8486
@Autowired
8587
private Queue encodedQueue;
8688

89+
@Autowired
90+
private Queue testQueue;
91+
8792
private SimpleMessageListenerContainer listenerContainer;
8893

8994
@Before
90-
public void setUp() throws Exception {
91-
listenerContainer = applicationContext.getBean(SimpleMessageListenerContainer.class);
95+
public void setUp() {
96+
this.listenerContainer = this.applicationContext.getBean(SimpleMessageListenerContainer.class);
97+
MDC.clear();
9298
}
9399

94100
@After
95101
public void tearDown() {
102+
MDC.clear();
96103
listenerContainer.shutdown();
97104
}
98105

@@ -135,7 +142,8 @@ public void testAppenderWithProps() throws InterruptedException {
135142
assertNotNull(location);
136143
assertThat(location, instanceOf(String.class));
137144
assertThat((String) location,
138-
startsWith("org.springframework.amqp.rabbit.logback.AmqpAppenderIntegrationTests.testAppenderWithProps()"));
145+
startsWith("org.springframework.amqp.rabbit.logback.AmqpAppenderIntegrationTests.testAppenderWithProps" +
146+
"()"));
139147
Object threadName = messageProperties.getHeaders().get("thread");
140148
assertNotNull(threadName);
141149
assertThat(threadName, instanceOf(String.class));
@@ -187,6 +195,32 @@ public void customQueueIsUsedIfProvided() throws Exception {
187195
verify(appenderQueue).add(argThat(arg -> arg.getEvent().getMessage().equals(testMessage)));
188196
}
189197

198+
@Test
199+
public void testAddMdcAsHeaders() {
200+
this.applicationContext.getBean(SingleConnectionFactory.class).createConnection().close();
201+
202+
Logger logWithMdc = (Logger) LoggerFactory.getLogger("withMdc");
203+
Logger logWithoutMdc = (Logger) LoggerFactory.getLogger("withoutMdc");
204+
MDC.put("mdc1", "test1");
205+
MDC.put("mdc2", "test2");
206+
207+
logWithMdc.info("test message with MDC in headers");
208+
Message received1 = this.template.receive(this.testQueue.getName());
209+
210+
assertNotNull(received1);
211+
assertEquals("test message with MDC in headers", new String(received1.getBody()));
212+
assertThat(received1.getMessageProperties().getHeaders(), hasEntry("mdc1", "test1"));
213+
assertThat(received1.getMessageProperties().getHeaders(), hasEntry("mdc2", "test2"));
214+
215+
logWithoutMdc.info("test message without MDC in headers");
216+
Message received2 = this.template.receive(this.testQueue.getName());
217+
218+
assertNotNull(received2);
219+
assertEquals("test message without MDC in headers", new String(received2.getBody()));
220+
assertThat(received1.getMessageProperties().getHeaders(), hasEntry("mdc1", "test1"));
221+
assertThat(received1.getMessageProperties().getHeaders(), hasEntry("mdc2", "test2"));
222+
}
223+
190224
public static class EnhancedAppender extends AmqpAppender {
191225

192226
private String foo;
@@ -224,6 +258,7 @@ protected BlockingQueue<Event> createEventQueue() {
224258
mockedQueue = mock(BlockingQueue.class);
225259
return mockedQueue;
226260
}
261+
227262
}
228263

229264
}

spring-rabbit/src/test/resources/log4j2-amqp-appender.xml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
addresses="localhost:5672"
2323
host="localhost" port="5672" user="guest" password="guest" applicationId="testAppId" charset="UTF-8"
2424
routingKeyPattern="%X{applicationId}.%c.%p"
25-
exchange="log4j2Test_default_queue" deliveryMode="NON_PERSISTENT">
25+
exchange="log4j2Test_default_queue" deliveryMode="NON_PERSISTENT"
26+
addMdcAsHeaders="true">
2627
</RabbitMQ>
2728
<RabbitMQ name="rabbitmq_uri"
2829
uri="amqp://guest:guest@localhost:5672/"
@@ -32,7 +33,8 @@
3233
contentType="text/plain" contentEncoding="UTF-8" generateId="true" deliveryMode="NON_PERSISTENT"
3334
charset="UTF-8"
3435
clientConnectionProperties="bar:foo,baz:qux"
35-
senderPoolSize="3" maxSenderRetries="5">
36+
senderPoolSize="3" maxSenderRetries="5"
37+
addMdcAsHeaders="false">
3638
</RabbitMQ>
3739
</Appenders>
3840
<Loggers>

0 commit comments

Comments
 (0)