Skip to content

Commit be3c642

Browse files
authored
GH-1479: ConsumerRecordMetadata
Resolves #1479 Because we pass the payload in as a discrete argument, standard headers such as `RECEIVED_TOPIC` will get the payload value if it's a String. See spring-projects/spring-framework#25033 Provide a mechanism to get the record metadata as a parameter instead. Make this available in all listeners for consistency. * Polishing
1 parent c831259 commit be3c642

File tree

7 files changed

+402
-6
lines changed

7 files changed

+402
-6
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener.adapter;
18+
19+
import org.apache.kafka.clients.consumer.ConsumerRecord;
20+
import org.apache.kafka.clients.producer.RecordMetadata;
21+
import org.apache.kafka.common.TopicPartition;
22+
23+
import org.springframework.lang.Nullable;
24+
25+
/**
26+
* Utilities for listener adapters.
27+
*
28+
* @author Gary Russell
29+
* @since 2.5
30+
*
31+
*/
32+
public final class AdapterUtils {
33+
34+
private AdapterUtils() {
35+
}
36+
37+
/**
38+
* Build a {@link ConsumerRecordMetadata} from the first {@link ConsumerRecord} in data, if any.
39+
* @param data the data array.
40+
* @return the metadata or null if data does not contain a {@link ConsumerRecord}.
41+
*/
42+
@Nullable
43+
public static Object buildConsumerRecordMetadataFromArray(Object... data) {
44+
for (Object object : data) {
45+
ConsumerRecordMetadata metadata = buildConsumerRecordMetadata(object);
46+
if (metadata != null) {
47+
return metadata;
48+
}
49+
}
50+
return null;
51+
}
52+
53+
/**
54+
* Build a {@link ConsumerRecordMetadata} from data which must be a
55+
* {@link ConsumerRecord}.
56+
* @param data the record.
57+
* @return the metadata or null if data is not a {@link ConsumerRecord}.
58+
*/
59+
@Nullable
60+
public static ConsumerRecordMetadata buildConsumerRecordMetadata(Object data) {
61+
if (!(data instanceof ConsumerRecord)) {
62+
return null;
63+
}
64+
ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) data;
65+
return new ConsumerRecordMetadata(new RecordMetadata(new TopicPartition(record.topic(), record.partition()),
66+
0, record.offset(), record.timestamp(), null, record.serializedKeySize(),
67+
record.serializedValueSize()), record.timestampType());
68+
}
69+
70+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener.adapter;
18+
19+
import org.apache.kafka.clients.producer.RecordMetadata;
20+
import org.apache.kafka.common.record.TimestampType;
21+
22+
/**
23+
* Used to provide a listener method argument when the user supplies such a parameter.
24+
* Delegates to {@link RecordMetadata} (which is final, hence no subclass) for all except
25+
* timestamp type.
26+
*
27+
* @author Gary Russell
28+
* @since 2.5
29+
*
30+
*/
31+
public class ConsumerRecordMetadata {
32+
33+
private final RecordMetadata delegate;
34+
35+
private final TimestampType timestampType;
36+
37+
public ConsumerRecordMetadata(RecordMetadata delegate, TimestampType timestampType) {
38+
this.delegate = delegate;
39+
this.timestampType = timestampType;
40+
}
41+
42+
public boolean hasOffset() {
43+
return this.delegate.hasOffset();
44+
}
45+
46+
public long offset() {
47+
return this.delegate.offset();
48+
}
49+
50+
public boolean hasTimestamp() {
51+
return this.delegate.hasTimestamp();
52+
}
53+
54+
public long timestamp() {
55+
return this.delegate.timestamp();
56+
}
57+
58+
public int serializedKeySize() {
59+
return this.delegate.serializedKeySize();
60+
}
61+
62+
public int serializedValueSize() {
63+
return this.delegate.serializedValueSize();
64+
}
65+
66+
public String topic() {
67+
return this.delegate.topic();
68+
}
69+
70+
public int partition() {
71+
return this.delegate.partition();
72+
}
73+
74+
public TimestampType timestampType() {
75+
return this.timestampType;
76+
}
77+
78+
@Override
79+
public int hashCode() {
80+
return this.delegate.hashCode() + this.timestampType.name().hashCode();
81+
}
82+
83+
@Override
84+
public boolean equals(Object obj) {
85+
if (!(obj instanceof ConsumerRecordMetadata)) {
86+
return false;
87+
}
88+
return this.delegate.equals(obj)
89+
&& this.timestampType.equals(((ConsumerRecordMetadata) obj).timestampType());
90+
}
91+
92+
@Override
93+
public String toString() {
94+
return this.delegate.toString();
95+
}
96+
97+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import java.lang.annotation.Annotation;
2020
import java.lang.reflect.Method;
21+
import java.lang.reflect.Parameter;
2122
import java.util.ArrayList;
2223
import java.util.Arrays;
2324
import java.util.HashMap;
@@ -40,6 +41,7 @@
4041
import org.springframework.kafka.support.KafkaUtils;
4142
import org.springframework.lang.Nullable;
4243
import org.springframework.messaging.Message;
44+
import org.springframework.messaging.handler.HandlerMethod;
4345
import org.springframework.messaging.handler.annotation.Header;
4446
import org.springframework.messaging.handler.annotation.SendTo;
4547
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
@@ -123,8 +125,11 @@ public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
123125
Object bean, BeanExpressionResolver beanExpressionResolver, BeanExpressionContext beanExpressionContext,
124126
@Nullable BeanFactory beanFactory) {
125127

126-
this.handlers = new ArrayList<>(handlers);
127-
this.defaultHandler = defaultHandler;
128+
this.handlers = new ArrayList<>();
129+
for (InvocableHandlerMethod handler : handlers) {
130+
this.handlers.add(wrapIfNecessary(handler));
131+
}
132+
this.defaultHandler = wrapIfNecessary(defaultHandler);
128133
this.bean = bean;
129134
this.resolver = beanExpressionResolver;
130135
this.beanExpressionContext = beanExpressionContext;
@@ -133,6 +138,19 @@ public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
133138
: null;
134139
}
135140

141+
private InvocableHandlerMethod wrapIfNecessary(InvocableHandlerMethod handler) {
142+
if (handler == null) {
143+
return null;
144+
}
145+
Parameter[] parameters = handler.getMethod().getParameters();
146+
for (Parameter parameter : parameters) {
147+
if (parameter.getType().equals(ConsumerRecordMetadata.class)) {
148+
return new DelegatingInvocableHandler.MetadataAwareInvocableHandlerMethod(handler);
149+
}
150+
}
151+
return handler;
152+
}
153+
136154
/**
137155
* Return the bean for this handler.
138156
* @return the bean.
@@ -152,7 +170,16 @@ public Object getBean() {
152170
public Object invoke(Message<?> message, Object... providedArgs) throws Exception { //NOSONAR
153171
Class<? extends Object> payloadClass = message.getPayload().getClass();
154172
InvocableHandlerMethod handler = getHandlerForPayload(payloadClass);
155-
Object result = handler.invoke(message, providedArgs);
173+
Object result;
174+
if (handler instanceof MetadataAwareInvocableHandlerMethod) {
175+
Object[] args = new Object[providedArgs.length + 1];
176+
args[0] = AdapterUtils.buildConsumerRecordMetadataFromArray(providedArgs);
177+
System.arraycopy(providedArgs, 0, args, 1, providedArgs.length);
178+
result = handler.invoke(message, args);
179+
}
180+
else {
181+
result = handler.invoke(message, providedArgs);
182+
}
156183
Expression replyTo = this.handlerSendTo.get(handler);
157184
return new InvocationResult(result, replyTo, this.handlerReturnsMessage.get(handler));
158185
}
@@ -284,4 +311,17 @@ public boolean hasDefaultHandler() {
284311
return this.defaultHandler != null;
285312
}
286313

314+
/**
315+
* A handler method that is aware of {@link ConsumerRecordMetadata}.
316+
*
317+
* @since 2.5
318+
*/
319+
private static final class MetadataAwareInvocableHandlerMethod extends InvocableHandlerMethod {
320+
321+
MetadataAwareInvocableHandlerMethod(HandlerMethod handlerMethod) {
322+
super(handlerMethod);
323+
}
324+
325+
}
326+
287327
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
120120

121121
private boolean hasAckParameter;
122122

123+
private boolean hasMetadataParameter;
124+
123125
private boolean messageReturnType;
124126

125127
private ReplyHeadersConfigurer replyHeadersConfigurer;
@@ -324,7 +326,13 @@ protected final Object invokeHandler(Object data, Acknowledgment acknowledgment,
324326
return this.handlerMethod.invoke(message, acknowledgment, consumer);
325327
}
326328
else {
327-
return this.handlerMethod.invoke(message, data, acknowledgment, consumer);
329+
if (this.hasMetadataParameter) {
330+
return this.handlerMethod.invoke(message, data, acknowledgment, consumer,
331+
AdapterUtils.buildConsumerRecordMetadata(data));
332+
}
333+
else {
334+
return this.handlerMethod.invoke(message, data, acknowledgment, consumer);
335+
}
328336
}
329337
}
330338
catch (org.springframework.messaging.converter.MessageConversionException ex) {
@@ -562,6 +570,9 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity
562570
isNotConvertible |= isAck;
563571
boolean isConsumer = parameterIsType(parameterType, Consumer.class);
564572
isNotConvertible |= isConsumer;
573+
boolean isMeta = parameterIsType(parameterType, ConsumerRecordMetadata.class);
574+
this.hasMetadataParameter |= isMeta;
575+
isNotConvertible |= isMeta;
565576
if (isNotConvertible) {
566577
notConvertibleParameters++;
567578
}

0 commit comments

Comments
 (0)