Skip to content

fix(tmq): client does not poll data in a long time if there are some exceptions in channel#34781

Open
wangmm0220 wants to merge 1 commit into3.3.8from
fix/tmq-no-polling
Open

fix(tmq): client does not poll data in a long time if there are some exceptions in channel#34781
wangmm0220 wants to merge 1 commit into3.3.8from
fix/tmq-no-polling

Conversation

@wangmm0220
Copy link
Contributor

…exceptions in channel

Description

Issue(s)

  • Close/close/Fix/fix/Resolve/resolve: Issue Link

Checklist

Please check the items in the checklist if applicable.

  • Is the user manual updated?
  • Are the test cases passed and automated?
  • Is there no significant decrease in test coverage?

Copilot AI review requested due to automatic review settings March 13, 2026 12:02
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a critical issue where the TMQ client could stop polling data for prolonged periods due to channel exceptions. The fix involves adjusting the client's idle polling duration and implementing a safeguard to reset the virtual group's status if it becomes stuck. Concurrently, the PR refactors the TMQ system by completely removing the push-based data transfer mechanism, simplifying the overall architecture and focusing on a more robust polling model.

Highlights

  • Client Polling Logic Enhancement: The EMPTY_BLOCK_POLL_IDLE_DURATION in the TMQ client has been increased from 10ms to 100ms to prevent the client from becoming unresponsive and not polling data for extended periods, especially when exceptions occur in the channel. Additionally, a mechanism was introduced to reset the virtual group status to IDLE if it has been skipping polls for an excessive number of times (100,000 skips).
  • Removal of TMQ Push Mechanism: The entire tqPush mechanism, including its source file (tqPush.c), related data structures (pPushMgr in STQ), function declarations (tqPushMsg, tqProcessPollPush), and message handler registrations (TDMT_VND_TMQ_CONSUME_PUSH), has been removed from the vnode and management layers. This streamlines the TMQ data transfer model, likely consolidating it to a pure polling approach.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • source/client/src/clientTmq.c
    • Increased EMPTY_BLOCK_POLL_IDLE_DURATION from 10 to 100.
    • Added logic to reset vgStatus to TMQ_VG_STATUS__IDLE if vgSkipCnt reaches 100000.
  • source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
    • Removed registration for the TDMT_VND_TMQ_CONSUME_PUSH message handler.
  • source/dnode/vnode/CMakeLists.txt
    • Removed src/tq/tqPush.c from the list of source files to be compiled.
  • source/dnode/vnode/src/inc/tq.h
    • Removed the pPushMgr member from the STQ structure.
  • source/dnode/vnode/src/inc/vnodeInt.h
    • Removed the function declarations for tqPushMsg and tqProcessPollPush.
  • source/dnode/vnode/src/tq/tq.c
    • Removed the initialization and cleanup of the pPushMgr hash object.
    • Commented out calls to tqUnregisterPushHandle in tqProcessSeekReq, tqProcessDeleteSubReq, and tqProcessSubscribeReq.
    • Removed the entire tqProcessPollPush function implementation.
  • source/dnode/vnode/src/tq/tqPush.c
    • Removed the entire file, which contained the implementation for the TMQ push mechanism.
  • source/dnode/vnode/src/tq/tqUtil.c
    • Removed the conditional block that registered a push handle if no WAL log existed and no data blocks were received.
  • source/dnode/vnode/src/vnd/vnodeSvr.c
    • Removed the call to tqPushMsg within vnodeProcessWriteMsg.
    • Removed the TDMT_VND_TMQ_CONSUME_PUSH case from the vnodeProcessQueryMsg switch statement.
Activity
  • No specific activity (comments, reviews, progress) has been recorded for this pull request yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request addresses a bug where the TMQ client could get stuck and stop polling for data. The fix involves adding a timeout based on the number of skipped polls for a vgroup, which resets its status and allows polling to resume. The idle duration for polling after receiving an empty block has also been increased. A significant portion of this pull request is dedicated to refactoring, which removes the TMQ push model and its associated code. My review includes suggestions to improve a debug log message for clarity and to replace a magic number with a named constant to enhance maintainability.

Comment on lines 2398 to 2399
tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
tmq->epoch, pVg->vgId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The debug message here is misleading. The idle duration was changed to 100ms (EMPTY_BLOCK_POLL_IDLE_DURATION), but the log message still refers to '10ms'. It would be better to make this message reflect the actual value to avoid confusion during debugging.

        tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %dms before start next poll", tmq->consumerId,
                 tmq->epoch, pVg->vgId, EMPTY_BLOCK_POLL_IDLE_DURATION);

}

// set status = idle if no response from vnode in a long time to avoid not polling data from vnode
if (atomic_load_32(&pVg->vgSkipCnt) == 100000) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The value 100000 is a magic number, making the code harder to understand and maintain. It's recommended to define it as a named constant, for example TMQ_MAX_VG_SKIP_COUNT, near other constants at the top of the file.

      if (atomic_load_32(&pVg->vgSkipCnt) == TMQ_MAX_VG_SKIP_COUNT) {

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR removes the TMQ “push-to-consumer” mechanism on the vnode side (push manager + internal CONSUME_PUSH message path) and adjusts the TMQ client polling behavior to rely on polling/backoff instead.

Changes:

  • Removed vnode/TQ push manager infrastructure (pPushMgr), push message dispatch, and related message handler paths.
  • Simplified TQ polling flow by removing push-handle registration logic when WAL reaches end.
  • Tuned client-side polling backoff (empty-block idle duration) and added a “stuck WAIT” recovery path.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
source/dnode/vnode/src/vnd/vnodeSvr.c Removes write-time push trigger and query handling for CONSUME_PUSH.
source/dnode/vnode/src/tq/tqUtil.c Removes push-handle registration when no more WAL data is available.
source/dnode/vnode/src/tq/tqPush.c Deletes the push-manager implementation.
source/dnode/vnode/src/tq/tq.c Removes push-manager init/cleanup and deletes poll-push handler; leaves commented-out unregister calls.
source/dnode/vnode/src/inc/vnodeInt.h Drops tqPushMsg/tqProcessPollPush declarations; still declares push-handle APIs.
source/dnode/vnode/src/inc/tq.h Removes pPushMgr from STQ.
source/dnode/vnode/CMakeLists.txt Removes tqPush.c from build sources.
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c Stops registering TDMT_VND_TMQ_CONSUME_PUSH message handler.
source/client/src/clientTmq.c Increases empty-block polling idle duration and adds a “stuck vgStatus WAIT” reset.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines 2398 to 2399
tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
tmq->epoch, pVg->vgId);
Comment on lines +2410 to +2413
// set status = idle if no response from vnode in a long time to avoid not polling data from vnode
if (atomic_load_32(&pVg->vgSkipCnt) == 100000) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
}
Comment on lines 286 to 289
// if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to
// TMQ_VG_STATUS__IDLE, otherwise poll data failed after seek.
tqUnregisterPushHandle(pTq, pHandle);
// tqUnregisterPushHandle(pTq, pHandle);
taosWUnLockLatch(&pTq->lock);
continue;
}
tqUnregisterPushHandle(pTq, pHandle);
// tqUnregisterPushHandle(pTq, pHandle);
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_store_32(&pHandle->epoch, 0);
tqUnregisterPushHandle(pTq, pHandle);
// tqUnregisterPushHandle(pTq, pHandle);
Comment on lines 260 to 261
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
void tqUnregisterPushHandle(STQ* pTq, void* pHandle);
taosWUnLockLatch(&pTq->lock);
}

// reqOffset represents the current date offset, may be changed if wal not exists
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants