-
Notifications
You must be signed in to change notification settings - Fork 277
Description
Before Creating the Bug Report
-
I found a bug, not just asking a question, which should be created in GitHub Discussions.
-
I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.
-
I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.
Programming Language of the Client
Java
Runtime Platform Environment
windows11
RocketMQ Version of the Client/Server
5.0.2
Run or Compiler Version
JDK17
Describe the Bug
ProducerImpl#
private void send0(SettableFuture<List> future0, String topic, MessageType messageType,
final List candidates, final List messages, final int attempt) {
// Calculate the current message queue.
final MessageQueueImpl mq = candidates.get(IntMath.mod(attempt - 1, candidates.size()));
final List acceptMessageTypes = mq.getAcceptMessageTypes();
if (publishingSettings.isValidateMessageType() && !acceptMessageTypes.contains(messageType)) {
final IllegalArgumentException e = new IllegalArgumentException("Current message type not match with "
+ "topic accept message types, topic=" + topic + ", actualMessageType=" + messageType + ", "
+ "acceptMessageTypes=" + acceptMessageTypes);
future0.setException(e);
return;
}
final Endpoints endpoints = mq.getBroker().getEndpoints();
final ListenableFuture<List> future = send0(endpoints, messages, mq);
final int maxAttempts = this.getRetryPolicy().getMaxAttempts();
// Intercept before message publishing.
final List<GeneralMessage> generalMessages = messages.stream().map((Function<PublishingMessageImpl,
GeneralMessage>) GeneralMessageImpl::new).collect(Collectors.toList());
final MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.SEND);
doBefore(context, generalMessages);
Futures.addCallback(future, new FutureCallback<List<SendReceiptImpl>>() {
@Override
public void onSuccess(List<SendReceiptImpl> sendReceipts) {
// Should never reach here.
if (sendReceipts.size() != messages.size()) {
final InternalErrorException e = new InternalErrorException("[Bug] due to an"
+ " unknown reason from remote, received send receipt's quantity " + sendReceipts.size()
+ " is not equal to sent message's quantity " + messages.size());
future0.setException(e);
// Intercept after message publishing.
final MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context,
MessageHookPointsStatus.ERROR);
doAfter(context0, generalMessages);
return;
}
// Intercept after message publishing.
final MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context,
MessageHookPointsStatus.OK);
doAfter(context0, generalMessages);
// No need more attempts.
future0.set(sendReceipts);
// Resend message(s) successfully.
if (1 < attempt) {
// Collect messageId(s) for logging.
List<MessageId> messageIds = new ArrayList<>();
for (SendReceipt receipt : sendReceipts) {
messageIds.add(receipt.getMessageId());
}
LOGGER.info("Resend message successfully, topic={}, messageId(s)={}, maxAttempts={}, "
+ "attempt={}, endpoints={}, clientId={}", topic, messageIds, maxAttempts, attempt,
endpoints, clientId);
}
// Send message(s) successfully on first attempt, return directly.
}
@Override
public void onFailure(Throwable t) {
// Intercept after message publishing.
final MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context,
MessageHookPointsStatus.ERROR);
doAfter(context0, generalMessages);
// Collect messageId(s) for logging.
List<MessageId> messageIds = new ArrayList<>();
for (PublishingMessageImpl message : messages) {
messageIds.add(message.getMessageId());
}
// Isolate endpoints because of sending failure.
isolate(endpoints);
if (attempt >= maxAttempts) {
// No need more attempts.
future0.setException(t);
LOGGER.error("Failed to send message(s) finally, run out of attempt times, maxAttempts={}, " +
"attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}",
maxAttempts, attempt, topic, messageIds, endpoints, clientId, t);
return;
}
// No need more attempts for transactional message.
if (MessageType.TRANSACTION.equals(messageType)) {
future0.setException(t);
LOGGER.error("Failed to send transactional message finally, maxAttempts=1, attempt={}, " +
"topic={}, messageId(s)={}, endpoints={}, clientId={}", attempt, topic, messageIds,
endpoints, clientId, t);
return;
}
// Try to do more attempts.
int nextAttempt = 1 + attempt;
// Retry immediately if the request is not throttled.
if (!(t instanceof TooManyRequestsException)) {
LOGGER.warn("Failed to send message, would attempt to resend right now, maxAttempts={}, "
+ "attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", maxAttempts, attempt,
topic, messageIds, endpoints, clientId, t);
send0(future0, topic, messageType, candidates, messages, nextAttempt);
return;
}
final Duration delay = ProducerImpl.this.getRetryPolicy().getNextAttemptDelay(nextAttempt);
LOGGER.warn("Failed to send message due to too many requests, would attempt to resend after {}, "
+ "maxAttempts={}, attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", delay,
maxAttempts, attempt, topic, messageIds, endpoints, clientId, t);
ProducerImpl.this.getClientManager().getScheduler().schedule(() -> send0(future0, topic, messageType,
candidates, messages, nextAttempt), delay.toNanos(), TimeUnit.NANOSECONDS);
}
}, clientCallbackExecutor);
}
这里的before是不是存在调用时机错误? 感觉像是没注意。 ProducerImpl#
private void send0(SettableFuture<List> future0, String topic, MessageType messageType,
final List candidates, final List messages, final int attempt) {
// Calculate the current message queue.
final MessageQueueImpl mq = candidates.get(IntMath.mod(attempt - 1, candidates.size()));
final List acceptMessageTypes = mq.getAcceptMessageTypes();
if (publishingSettings.isValidateMessageType() && !acceptMessageTypes.contains(messageType)) {
final IllegalArgumentException e = new IllegalArgumentException("Current message type not match with "
+ "topic accept message types, topic=" + topic + ", actualMessageType=" + messageType + ", "
+ "acceptMessageTypes=" + acceptMessageTypes);
future0.setException(e);
return;
}
final Endpoints endpoints = mq.getBroker().getEndpoints();
final ListenableFuture<List> future = send0(endpoints, messages, mq);
final int maxAttempts = this.getRetryPolicy().getMaxAttempts();
// Intercept before message publishing.
final List<GeneralMessage> generalMessages = messages.stream().map((Function<PublishingMessageImpl,
GeneralMessage>) GeneralMessageImpl::new).collect(Collectors.toList());
final MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.SEND);
doBefore(context, generalMessages);
Futures.addCallback(future, new FutureCallback<List<SendReceiptImpl>>() {
@Override
public void onSuccess(List<SendReceiptImpl> sendReceipts) {
// Should never reach here.
if (sendReceipts.size() != messages.size()) {
final InternalErrorException e = new InternalErrorException("[Bug] due to an"
+ " unknown reason from remote, received send receipt's quantity " + sendReceipts.size()
+ " is not equal to sent message's quantity " + messages.size());
future0.setException(e);
// Intercept after message publishing.
final MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context,
MessageHookPointsStatus.ERROR);
doAfter(context0, generalMessages);
return;
}
// Intercept after message publishing.
final MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context,
MessageHookPointsStatus.OK);
doAfter(context0, generalMessages);
// No need more attempts.
future0.set(sendReceipts);
// Resend message(s) successfully.
if (1 < attempt) {
// Collect messageId(s) for logging.
List<MessageId> messageIds = new ArrayList<>();
for (SendReceipt receipt : sendReceipts) {
messageIds.add(receipt.getMessageId());
}
LOGGER.info("Resend message successfully, topic={}, messageId(s)={}, maxAttempts={}, "
+ "attempt={}, endpoints={}, clientId={}", topic, messageIds, maxAttempts, attempt,
endpoints, clientId);
}
// Send message(s) successfully on first attempt, return directly.
}
@Override
public void onFailure(Throwable t) {
// Intercept after message publishing.
final MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context,
MessageHookPointsStatus.ERROR);
doAfter(context0, generalMessages);
// Collect messageId(s) for logging.
List<MessageId> messageIds = new ArrayList<>();
for (PublishingMessageImpl message : messages) {
messageIds.add(message.getMessageId());
}
// Isolate endpoints because of sending failure.
isolate(endpoints);
if (attempt >= maxAttempts) {
// No need more attempts.
future0.setException(t);
LOGGER.error("Failed to send message(s) finally, run out of attempt times, maxAttempts={}, " +
"attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}",
maxAttempts, attempt, topic, messageIds, endpoints, clientId, t);
return;
}
// No need more attempts for transactional message.
if (MessageType.TRANSACTION.equals(messageType)) {
future0.setException(t);
LOGGER.error("Failed to send transactional message finally, maxAttempts=1, attempt={}, " +
"topic={}, messageId(s)={}, endpoints={}, clientId={}", attempt, topic, messageIds,
endpoints, clientId, t);
return;
}
// Try to do more attempts.
int nextAttempt = 1 + attempt;
// Retry immediately if the request is not throttled.
if (!(t instanceof TooManyRequestsException)) {
LOGGER.warn("Failed to send message, would attempt to resend right now, maxAttempts={}, "
+ "attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", maxAttempts, attempt,
topic, messageIds, endpoints, clientId, t);
send0(future0, topic, messageType, candidates, messages, nextAttempt);
return;
}
final Duration delay = ProducerImpl.this.getRetryPolicy().getNextAttemptDelay(nextAttempt);
LOGGER.warn("Failed to send message due to too many requests, would attempt to resend after {}, "
+ "maxAttempts={}, attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", delay,
maxAttempts, attempt, topic, messageIds, endpoints, clientId, t);
ProducerImpl.this.getClientManager().getScheduler().schedule(() -> send0(future0, topic, messageType,
candidates, messages, nextAttempt), delay.toNanos(), TimeUnit.NANOSECONDS);
}
}, clientCallbackExecutor);
}
这里的before是不是存在调用时机错误? 感觉像是没注意。 在dobefore的调用时间在 final ListenableFuture<List> future = send0(endpoints, messages, mq);之后?
既然是发送消息之前拦截 应该是在其之前把? 对照了MessageInterceptor #doBefore的其他实现 都是在ListenableFuture<List> future = send0(endpoints, messages, mq);之前
Steps to Reproduce
null
What Did You Expect to See?
期望是正确的调用顺序
What Did You See Instead?
非正常调用顺序
Additional Context
null