Skip to content

Commit 7db4980

Browse files
authored
Fix that ConcurrentLinkedDeque.removeIf is not actually thread-safe (apache#16598)
1 parent c72f67e commit 7db4980

File tree

2 files changed

+15
-9
lines changed

2 files changed

+15
-9
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/LoginLockManager.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,19 @@ public LoginLockManager(
100100

101101
/** Inner class to store user lock information */
102102
static class UserLockInfo {
103+
103104
// Deque to store timestamps of failed attempts (milliseconds)
104-
private final Deque<Long> failureTimestamps = new ConcurrentLinkedDeque<>();
105+
private final Deque<Long> failureTimestamps;
106+
107+
UserLockInfo(int capacity) {
108+
failureTimestamps = new ConcurrentLinkedDeque<>();
109+
}
105110

106-
void addFailureTime(long timestamp) {
111+
synchronized void addFailureTime(long timestamp) {
107112
failureTimestamps.addLast(timestamp);
108113
}
109114

110-
void removeOldFailures(long cutoffTime) {
115+
synchronized void removeOldFailures(long cutoffTime) {
111116
// Remove timestamps older than cutoffTime
112117
failureTimestamps.removeIf(timestamp -> timestamp < cutoffTime);
113118
}
@@ -191,15 +196,16 @@ public void recordFailure(long userId, String ip) {
191196
userIpKey,
192197
(key, existing) -> {
193198
if (existing == null) {
194-
existing = new UserLockInfo();
199+
existing =
200+
new UserLockInfo(Math.max(failedLoginAttempts, failedLoginAttemptsPerUser));
195201
}
196202
// Remove failures outside of sliding window
197203
existing.removeOldFailures(cutoffTime);
198204
// Record this failure
199205
existing.addFailureTime(now);
200206
// Check if threshold reached (log only when it just reaches)
201207
int failCountIp = existing.getFailureCount();
202-
if (failCountIp >= failedLoginAttempts && failCountIp == failedLoginAttempts) {
208+
if (failCountIp >= failedLoginAttempts) {
203209
LOGGER.info("IP '{}' locked for user ID '{}'", ip, userId);
204210
}
205211
return existing;
@@ -212,16 +218,16 @@ public void recordFailure(long userId, String ip) {
212218
userId,
213219
(key, existing) -> {
214220
if (existing == null) {
215-
existing = new UserLockInfo();
221+
existing =
222+
new UserLockInfo(Math.max(failedLoginAttempts, failedLoginAttemptsPerUser));
216223
}
217224
// Remove failures outside of sliding window
218225
existing.removeOldFailures(cutoffTime);
219226
// Record this failure
220227
existing.addFailureTime(now);
221228
// Check if threshold reached (log only when it just reaches)
222229
int failCountUser = existing.getFailureCount();
223-
if (failCountUser >= failedLoginAttemptsPerUser
224-
&& failCountUser == failedLoginAttemptsPerUser) {
230+
if (failCountUser >= failedLoginAttemptsPerUser) {
225231
LOGGER.info(
226232
"User ID '{}' locked due to {} failed attempts",
227233
userId,

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/LoginLockManagerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -533,9 +533,9 @@ public void testConcurrentLockStateConsistency() throws InterruptedException {
533533

534534
@Test
535535
public void testConcurrentOperateLockInfo() throws InterruptedException, ExecutionException {
536-
UserLockInfo userLockInfo = new UserLockInfo();
537536
int numThreads = 100;
538537
final int numAttempts = 100000;
538+
UserLockInfo userLockInfo = new UserLockInfo(numThreads * numAttempts);
539539
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
540540
List<Future<Void>> threads = new ArrayList<>(numThreads);
541541
for (int i = 0; i < numThreads; i++) {

0 commit comments

Comments
 (0)