Skip to content

Conversation

@RobertIndie
Copy link
Member

@RobertIndie RobertIndie commented Apr 18, 2025

Fixes #1332

Motivation

The root cause is that the producer event loops become busy during reconnection, preventing messages in dataChan from timing out. And the ctx of the SendAsync won't be respected in this case.

SendAsync can wait until the runEventLoop processes it and pushes it into the pendingQueue or a batch, just like the Java client. Before entering the pendingQueue, SendAsync itself can check for timeouts and handle the callback. After entering the pendingQueue, failTimeoutMessages can manage the timeout.

Modifications

  • Introduced a new channel enqueued to make SendAsync wait until the sendRequest is added to the pending queue.
  • Use ctx to check for timeouts and invoke the callback if a timeout occurs.

Verifying this change

The test TestSendAsyncCouldTimeoutWhileReconnecting is based on test from this PR: #1345

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes / no)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@RobertIndie RobertIndie requested a review from Copilot April 18, 2025 09:39
@RobertIndie RobertIndie self-assigned this Apr 18, 2025
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes an issue where SendAsync may not time out during producer reconnection by introducing a new channel (enqueued) to signal when send requests are added to the pending queue. It also adds tests to validate timeout behavior in reconnection scenarios and when the pending queue is full.

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
pulsar/producer_test.go Added tests to verify SendAsync timeout behavior.
pulsar/producer_partition.go Updated producer logic to include and utilize the new enqueued channel.
Comments suppressed due to low confidence (1)

pulsar/producer_test.go:2705

  • Avoid using fixed sleeps for synchronization in tests as they can lead to flaky results; consider using a more robust waiting mechanism (e.g., require.Eventually) to ensure the pending queue is properly filled before proceeding.
time.Sleep(3 * time.Second)

@RobertIndie RobertIndie marked this pull request as draft April 18, 2025 09:57
@RobertIndie RobertIndie marked this pull request as ready for review April 18, 2025 12:03
select {
case <-sr.enqueued:
case <-ctx.Done():
err := ctx.Err()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, timeout is not set to the ctx provided by application/user, may be we should update the ctx with context.WithTimeout(ctx, config.timeout)?

}

p.dataChan <- sr
select {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the select here will block SendAsync() when producer is in reconnecting while the pengding queue is not full, it is not good for those latency-sensitive applications such as game.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I agree. I have put a new PR #1422 which also address this issue. PTAL

@gunli
Copy link
Contributor

gunli commented Jul 31, 2025

我认为这个bug的根因是我们连接管理模块不太合理,它和业务逻辑层的边界不清晰,混合了太多生者/消费者的逻辑在里面,或者说生产者/消费者也参与了太多连接管理的职责,我们可以看到生产者/消费者里有很多类似的连接相关的代码,我们需要重构才能更好的解决问题。像 #1345 这样加入新的channel和再加一个goroutine事件循环,也只是为了解决问题和解决,治标不治本还增加了复杂度,生产者里的事件循环已经够复杂了,如果仅为了解决问题,还真不如 #1333 这样改。

我认为连接管理模块只做好它的事情就行,创建连接、关闭连接、心跳、负载均衡、重连,对外提供获取连接、释放连接的接口,像个连接池,连接异常关闭,它在内部协程恢复连接就行,至于生产者、消费者,连接异常时,就去连接管理模块获取新连接,获取不到就不发数据好了,既然提供了异步的接口,对用户来说,除了队列满,就不能因为你SDK内部的问题影响(阻塞)我写数据,大不了最终失败,写进队列的数据,你发不出去该超时就超时,告诉我失败就行,你恢复了就继续发,现在连接异常,在生产者去死循环恢复连接就不应该是它的职责,同样的代码在消费者也有,很多都是重复的,有些bug生产者改了,消费者也要改一次,如果只了解生产者或者消费者,另一边的bug还不会改,我就提交过一些PR,有些是只在消费者改的,或者有些只有生产者改的,同样的问题需要都知情才会去改,这些都应该下沉到连接管理层去做。

@RobertIndie
Copy link
Member Author

我认为这个bug的根因是我们连接管理模块不太合理,它和业务逻辑层的边界不清晰,混合了太多生者/消费者的逻辑在里面,或者说生产者/消费者也参与了太多连接管理的职责,我们可以看到生产者/消费者里有很多类似的连接相关的代码,我们需要重构才能更好的解决问题。像 #1345 这样加入新的channel和再加一个goroutine事件循环,也只是为了解决问题和解决,治标不治本还增加了复杂度,生产者里的事件循环已经够复杂了,如果仅为了解决问题,还真不如 #1333 这样改。

我认为连接管理模块只做好它的事情就行,创建连接、关闭连接、心跳、负载均衡、重连,对外提供获取连接、释放连接的接口,像个连接池,连接异常关闭,它在内部协程恢复连接就行,至于生产者、消费者,连接异常时,就去连接管理模块获取新连接,获取不到就不发数据好了,既然提供了异步的接口,对用户来说,除了队列满,就不能因为你SDK内部的问题影响(阻塞)我写数据,大不了最终失败,写进队列的数据,你发不出去该超时就超时,告诉我失败就行,你恢复了就继续发,现在连接异常,在生产者去死循环恢复连接就不应该是它的职责,同样的代码在消费者也有,很多都是重复的,有些bug生产者改了,消费者也要改一次,如果只了解生产者或者消费者,另一边的bug还不会改,我就提交过一些PR,有些是只在消费者改的,或者有些只有生产者改的,同样的问题需要都知情才会去改,这些都应该下沉到连接管理层去做。

Thanks for your analysis. I agree that we should separate connection-related logic, such as reconnection, from the data and command event loop. However, the producer still needs to handle the reconnection logic because it is responsible for message resending. The connection layer only abstracts the underlying TCP connection for sending requests and receiving responses. Deciding which requests to send and what to do after reconnecting should be the producer's responsibility. Therefore, the current division of responsibilities looks good to me. We just need to ensure the connection logic and data logic do not run in the same event loop.

Also, if we separate them into two event loops (a reconnection loop and a data loop), we must be very careful about message ordering. This is because both loops can now send messages to the connection. The reconnection loop will resend messages after a connection is re-established, while the data loop will send new messages. Otherwise, this could lead to the issue described here: #1027.

I have proposed a new PR that fixes this in another way and also solves the issues mentioned above: #1422.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug][Producer] The callback was not invoked during reconnecting.

2 participants