Skip to content

Conversation

@Caideyipi
Copy link
Collaborator

@Caideyipi Caideyipi commented Jan 7, 2026

Description

As the title said.
Currently, the pipe uses LinkedBlockingDeque as the lock. This design reserved space for shared lock amongst threads, but is useless in current framework and hard to debug:

  1. If a thread forgets to unlock, the thread cannot be shown in Jstack
  2. A thread may unlock another thread's lock.
    Hence, we changed the lock to a simple, single-thread lock currently.

This PR has:

  • been self-reviewed.
    • concurrent read
    • concurrent write
    • concurrent read and write
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods.
  • added or updated version, license, or notice information
  • added comments explaining the "why" and the intent of the code wherever would not be obvious
    for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold
    for code coverage.
  • added integration tests.
  • been tested in a test IoTDB cluster.

Key changed/added classes (or packages if there are too many classes) in this PR

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors the PipeTaskCoordinatorLock implementation from using a LinkedBlockingDeque-based cross-thread lock mechanism to a standard ReentrantLock. The change simplifies the locking mechanism to enforce single-thread ownership, improving debuggability by making threads visible in Jstack and preventing threads from unlocking each other's locks.

Key changes:

  • Replaced LinkedBlockingDeque with ReentrantLock for thread synchronization
  • Removed ID generation and tracking mechanism for lock acquisitions
  • Eliminated the pipeTaskInfoHolder field from PipeTaskCoordinator and simplified lock/unlock methods

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
PipeTaskCoordinatorLock.java Refactored from LinkedBlockingDeque-based implementation to ReentrantLock; simplified lock(), tryLock(), unlock(), and isLocked() methods; removed ID tracking
PipeTaskCoordinator.java Removed pipeTaskInfoHolder field; simplified tryLock() and lock() methods to directly return new AtomicReference instances; cleaned up unlock() method by removing holder management logic
Comments suppressed due to low confidence (1)

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java:31

  • The javadoc states this is "a cross thread lock" but the implementation has been changed to use ReentrantLock, which enforces same-thread ownership. A thread must unlock the same lock it acquired. This documentation is now inaccurate and should be updated to reflect that this is a single-thread lock, as mentioned in the PR description.
/**
 * {@link PipeTaskCoordinatorLock} is a cross thread lock for pipe task coordinator. It is used to
 * ensure that only one thread can execute the pipe task coordinator at the same time.
 */

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 41 to 43
lock.lock();
LOGGER.debug(
"PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName());
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unlock() method now uses ReentrantLock.unlock() which will throw IllegalMonitorStateException if called by a thread that doesn't own the lock. However, the new lock() method at line 41 is not interruptible and doesn't handle InterruptedException, which could lead to uninterruptible waits. Consider using lockInterruptibly() instead to allow threads to be interrupted while waiting for the lock, similar to how tryLock() handles interruption.

Suggested change
lock.lock();
LOGGER.debug(
"PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName());
try {
lock.lockInterruptibly();
LOGGER.debug(
"PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.error(
"Interrupted while waiting for PipeTaskCoordinator lock, current thread: {}",
Thread.currentThread().getName());
}

Copilot uses AI. Check for mistakes.
@jt2594838 jt2594838 merged commit 99ee13d into apache:master Jan 7, 2026
27 of 30 checks passed
@jt2594838 jt2594838 deleted the lock-change branch January 7, 2026 09:55
Caideyipi added a commit that referenced this pull request Jan 8, 2026
jt2594838 pushed a commit that referenced this pull request Jan 8, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants