Skip to content

Commit e49cd55

Browse files
feat(datastructures): add thread-safe bounded queue implementation (#7428)
* feat(datastructures): add thread-safe bounded queue implementation Implements a thread-safe blocking queue using ReentrantLock and Condition variables for producer-consumer synchronization. ### What This Adds **ThreadSafeQueue.java** - Thread-safe bounded queue: - `enqueue()` - Blocking add to tail, waits when queue is full - `dequeue()` - Blocking remove from head, waits when queue is empty - `offer()` - Non-blocking add, returns false when full - `poll()` - Non-blocking remove, returns null when empty - `size()`, `isEmpty()`, `isFull()`, `capacity()` - State queries - Uses circular buffer for O(1) enqueue/dequeue operations - Supports multiple concurrent producers and consumers **ThreadSafeQueueTest.java** - Comprehensive test suite: - Basic enqueue/dequeue operations - Offer/poll non-blocking behavior - Null rejection validation - Invalid capacity rejection - Circular buffer wrap-around - Multiple producers single consumer concurrency - Single producer multiple consumers concurrency - Blocking behavior verification - Stress test with 8 concurrent threads ### Algorithm Uses a circular buffer with ReentrantLock and two Condition variables: - `notFull` - signaled when space becomes available - `notEmpty` - signaled when items are added - Producers await notFull when buffer is full - Consumers await notEmpty when buffer is empty - Signal opposite condition after each operation Time: O(1) enqueue/dequeue | Space: O(n) bounded buffer ### Reference https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem * fix(datastructures): correct test capacity and simplify concurrent test - testOfferPoll: Changed capacity from 3 to 2 so third offer correctly fails - testMultipleProducersSingleConsumer: Removed startLatch, use dedicated consumer thread with synchronized results list for thread safety * fix(datastructures): remove unused assertArrayEquals import Checkstyle flagged UnusedImports violation for org.junit.jupiter.api.Assertions.assertArrayEquals which was not used in any test method. * fix: replace signal() with signalAll() to satisfy SpotBugs MDM_SIGNAL_NOT_SIGNALALL SpotBugs flags all four Condition.signal() calls in ThreadSafeQueue as Medium severity bugs (MDM_SIGNAL_NOT_SIGNALALL). In a multi-producer/multi-consumer scenario, signal() wakes only one waiting thread, which can cause deadlock when multiple producers or consumers are blocked on the same condition variable. Using signalAll() ensures all waiting threads are notified and can re-check their loop condition, preventing the lost-wakeup problem that occurs when a single signal wakes a thread that cannot make progress. This change affects enqueue(), dequeue(), offer(), and poll() methods where notEmpty.signal() and notFull.signal() are replaced with notEmpty.signalAll() and notFull.signalAll() respectively. * test: replace static imports with Assertions prefix to satisfy PMD TooManyStaticImports PMD flags TooManyStaticImports when more than 4 static imports are present. The test file had 5 static imports from org.junit.jupiter.api.Assertions (equals, assertFalse, assertNull, assertThrows, assertTrue) which exceeded the default threshold. Replaced with regular import and Assertions. prefix to eliminate the PMD violation while maintaining readability.
1 parent 0a62b11 commit e49cd55

2 files changed

Lines changed: 481 additions & 0 deletions

File tree

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package com.thealgorithms.datastructures.queues;
2+
3+
import java.util.concurrent.locks.Condition;
4+
import java.util.concurrent.locks.ReentrantLock;
5+
6+
/**
7+
* @brief Thread-safe bounded queue implementation using ReentrantLock and Condition variables
8+
* @details A blocking queue that supports multiple producers and consumers.
9+
* Uses a circular buffer internally with lock-based synchronization to ensure
10+
* thread safety. Producers block when the queue is full, and consumers block
11+
* when the queue is empty.
12+
* @see <a href="https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem">Producer-Consumer Problem</a>
13+
*/
14+
public class ThreadSafeQueue<T> {
15+
16+
private final Object[] buffer;
17+
private final int capacity;
18+
private int head;
19+
private int tail;
20+
private int count;
21+
private final ReentrantLock lock;
22+
private final Condition notFull;
23+
private final Condition notEmpty;
24+
25+
/**
26+
* @brief Constructs a ThreadSafeQueue with the specified capacity
27+
* @param capacity the maximum number of elements the queue can hold
28+
* @throws IllegalArgumentException if capacity is less than or equal to zero
29+
*/
30+
public ThreadSafeQueue(int capacity) {
31+
if (capacity <= 0) {
32+
throw new IllegalArgumentException("Capacity must be greater than zero.");
33+
}
34+
this.capacity = capacity;
35+
this.buffer = new Object[capacity];
36+
this.head = 0;
37+
this.tail = 0;
38+
this.count = 0;
39+
this.lock = new ReentrantLock();
40+
this.notFull = lock.newCondition();
41+
this.notEmpty = lock.newCondition();
42+
}
43+
44+
/**
45+
* @brief Adds an element to the tail of the queue, blocking if full
46+
* @param item the element to add
47+
* @throws InterruptedException if the thread is interrupted while waiting
48+
* @throws IllegalArgumentException if the item is null
49+
*/
50+
public void enqueue(T item) throws InterruptedException {
51+
if (item == null) {
52+
throw new IllegalArgumentException("Cannot enqueue null item.");
53+
}
54+
55+
lock.lock();
56+
try {
57+
while (count == capacity) {
58+
notFull.await();
59+
}
60+
buffer[tail] = item;
61+
tail = (tail + 1) % capacity;
62+
count++;
63+
notEmpty.signalAll();
64+
} finally {
65+
lock.unlock();
66+
}
67+
}
68+
69+
/**
70+
* @brief Removes and returns the element at the head of the queue, blocking if empty
71+
* @return the element at the head of the queue
72+
* @throws InterruptedException if the thread is interrupted while waiting
73+
*/
74+
@SuppressWarnings("unchecked")
75+
public T dequeue() throws InterruptedException {
76+
lock.lock();
77+
try {
78+
while (count == 0) {
79+
notEmpty.await();
80+
}
81+
T item = (T) buffer[head];
82+
buffer[head] = null;
83+
head = (head + 1) % capacity;
84+
count--;
85+
notFull.signalAll();
86+
return item;
87+
} finally {
88+
lock.unlock();
89+
}
90+
}
91+
92+
/**
93+
* @brief Adds an element to the tail of the queue without blocking
94+
* @param item the element to add
95+
* @return true if the element was added, false if the queue was full
96+
* @throws IllegalArgumentException if the item is null
97+
*/
98+
public boolean offer(T item) {
99+
if (item == null) {
100+
throw new IllegalArgumentException("Cannot enqueue null item.");
101+
}
102+
103+
lock.lock();
104+
try {
105+
if (count == capacity) {
106+
return false;
107+
}
108+
buffer[tail] = item;
109+
tail = (tail + 1) % capacity;
110+
count++;
111+
notEmpty.signalAll();
112+
return true;
113+
} finally {
114+
lock.unlock();
115+
}
116+
}
117+
118+
/**
119+
* @brief Removes and returns the element at the head without blocking
120+
* @return the element at the head, or null if the queue is empty
121+
*/
122+
@SuppressWarnings("unchecked")
123+
public T poll() {
124+
lock.lock();
125+
try {
126+
if (count == 0) {
127+
return null;
128+
}
129+
T item = (T) buffer[head];
130+
buffer[head] = null;
131+
head = (head + 1) % capacity;
132+
count--;
133+
notFull.signalAll();
134+
return item;
135+
} finally {
136+
lock.unlock();
137+
}
138+
}
139+
140+
/**
141+
* @brief Returns the number of elements in the queue
142+
* @return the current size of the queue
143+
*/
144+
public int size() {
145+
lock.lock();
146+
try {
147+
return count;
148+
} finally {
149+
lock.unlock();
150+
}
151+
}
152+
153+
/**
154+
* @brief Checks if the queue is empty
155+
* @return true if the queue contains no elements
156+
*/
157+
public boolean isEmpty() {
158+
lock.lock();
159+
try {
160+
return count == 0;
161+
} finally {
162+
lock.unlock();
163+
}
164+
}
165+
166+
/**
167+
* @brief Checks if the queue is full
168+
* @return true if the queue has reached its capacity
169+
*/
170+
public boolean isFull() {
171+
lock.lock();
172+
try {
173+
return count == capacity;
174+
} finally {
175+
lock.unlock();
176+
}
177+
}
178+
179+
/**
180+
* @brief Returns the maximum capacity of the queue
181+
* @return the capacity
182+
*/
183+
public int capacity() {
184+
return capacity;
185+
}
186+
}

0 commit comments

Comments
 (0)