Skip to content

Commit defd14a

Browse files
committed
interrupt blocked thread
1 parent 1bd6a27 commit defd14a

File tree

4 files changed

+236
-17
lines changed

4 files changed

+236
-17
lines changed

concurrency/src/main/java/thread/InterruptBizDemo.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,10 @@ public class InterruptBizDemo {
1818
private static final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
1919
private static final AtomicBoolean run = new AtomicBoolean(true);
2020

21-
public static void main(String[] args) throws InterruptedException {
22-
log.info("start");
23-
24-
singleThreadMode();
25-
// consumerScheduler();
26-
}
27-
2821
/**
2922
* 保活
3023
*/
31-
private static void consumerScheduler() throws InterruptedException {
24+
static void consumerScheduler() throws InterruptedException {
3225
Thread producer = new Thread(() -> {
3326
AtomicInteger count = new AtomicInteger();
3427
while (true) {
@@ -78,7 +71,7 @@ private static void cpuWork(int x) {
7871
}
7972
}
8073

81-
private static void singleThreadMode() throws InterruptedException {
74+
static void singleThreadMode() throws InterruptedException {
8275
Thread normal = new Thread(() -> log.info("normal"));
8376
normal.setName("normal");
8477
normal.start();

concurrency/src/main/java/thread/ThreadStatusTransfer.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22

33
import lombok.extern.slf4j.Slf4j;
44

5+
import java.util.concurrent.TimeUnit;
6+
57
/**
68
* Created by https://github.com/kuangcp on 17-8-20 下午8:46
79
* 1. 使用 wait notify notifyAll 需要对调用对象加锁
810
* 2. 调用 wait 后 线程状态Running变为Waiting, 将当前线程放入对象的等待队列上
911
* 3. notify 调用后, 等待线程 wait 处 不会立即返回, 需要等到 notify 这个线程释放锁后才有机会
10-
* 4. notify 调用后 将等待线程从等待队列放入同步队列, 线程状态 Waiting 变成 Blocked
12+
* 4. notify 调用后 将等待线程从等待队列放入同步队列, 等待的线程状态会从 Waiting 变成 Blocked
1113
*/
1214
@Slf4j
1315
public class ThreadStatusTransfer {
@@ -41,15 +43,50 @@ static class Notify implements Runnable {
4143
public void run() {
4244
synchronized (lock) {
4345
log.info("hold lock. notify");
44-
lock.notify();
4546
flag = false;
47+
// 除非有特殊考虑,一般都是使用notifyAll,避免有线程一直等不到唤醒
48+
// lock.notify();
49+
lock.notifyAll();
4650
}
4751

48-
// 这一段就是重新获取锁, 会和wait进行竞争, 所以执行顺序不定
52+
// 这一段就是重新获取锁, 会和wait逻辑进行竞争, 所以执行顺序不定
4953
synchronized (lock) {
5054
log.info("hold lock again");
5155
}
5256
}
5357
}
5458

59+
60+
static class BlockMarkWait extends Thread {
61+
private boolean holdLock = false;
62+
private final Object lock;
63+
64+
public BlockMarkWait(Object lock) {
65+
this.lock = lock;
66+
}
67+
68+
@Override
69+
public void run() {
70+
synchronized (lock) {
71+
try {
72+
log.info("start wait");
73+
lock.wait();
74+
holdLock = true;
75+
log.info("get lock");
76+
TimeUnit.SECONDS.sleep(1);
77+
} catch (InterruptedException e) {
78+
log.error(e.getMessage(), e);
79+
}
80+
}
81+
log.info("finish");
82+
holdLock = false;
83+
}
84+
85+
public void tryInterrupt() {
86+
if (!holdLock) {
87+
this.interrupt();
88+
}
89+
}
90+
}
91+
5592
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package thread;
2+
3+
import lombok.extern.slf4j.Slf4j;
4+
import org.junit.Test;
5+
6+
/**
7+
* @author Kuangcp
8+
* 2024-09-25 09:34
9+
*/
10+
@Slf4j
11+
public class InterruptBizDemoTest {
12+
13+
14+
@Test
15+
public void testSingleThread() throws Exception {
16+
log.info("start sin");
17+
18+
InterruptBizDemo.singleThreadMode();
19+
}
20+
21+
@Test
22+
public void testConsumer() throws Exception {
23+
log.info("start");
24+
25+
InterruptBizDemo.consumerScheduler();
26+
}
27+
}
Lines changed: 167 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,186 @@
11
package thread;
22

3-
import java.util.concurrent.TimeUnit;
4-
3+
import lombok.extern.slf4j.Slf4j;
54
import org.junit.Test;
65
import thread.ThreadStatusTransfer.Notify;
76
import thread.ThreadStatusTransfer.Wait;
7+
import thread.ThreadStatusTransfer.BlockMarkWait;
8+
9+
import java.util.concurrent.TimeUnit;
810

911
/**
1012
* @author kuangcp on 2019-04-22 9:40 AM
1113
*/
14+
@Slf4j
1215
public class ThreadStatusTransferTest {
1316

1417
@Test
1518
public void testMain() throws InterruptedException {
16-
Thread waitThread = new Thread(new Wait(), "WaitThread");
19+
Thread waitThread = new Thread(new Wait(), "Waiter");
1720
waitThread.start();
1821

19-
TimeUnit.SECONDS.sleep(100);
22+
TimeUnit.SECONDS.sleep(10);
2023

21-
Thread notifyThread = new Thread(new Notify(), "NotifyThread");
24+
Thread notifyThread = new Thread(new Notify(), "Notify");
2225
notifyThread.start();
2326
}
27+
28+
/**
29+
* 效果为 三个线程进入waiting状态,等notify后三个线程抢锁依次执行
30+
*/
31+
@Test
32+
public void testMultipleWait() throws Exception {
33+
Object lock = new Object();
34+
35+
Runnable taskHandler = () -> {
36+
synchronized (lock) {
37+
try {
38+
log.info("start wait");
39+
lock.wait();
40+
TimeUnit.SECONDS.sleep(1);
41+
} catch (InterruptedException e) {
42+
log.error(e.getMessage(), e);
43+
}
44+
log.info("finish");
45+
}
46+
};
47+
new Thread(taskHandler).start();
48+
new Thread(taskHandler).start();
49+
new Thread(taskHandler).start();
50+
51+
TimeUnit.SECONDS.sleep(1);
52+
new Thread(() -> {
53+
synchronized (lock) {
54+
log.info("hold lock. notify");
55+
lock.notifyAll();
56+
}
57+
synchronized (lock) {
58+
log.info("try lock");
59+
}
60+
}).start();
61+
62+
Thread.currentThread().join(5000);
63+
log.info("finish All");
64+
}
65+
66+
/**
67+
* 响应中断的方法: 线程进入等待或是超时等待的状态后,调用interrupt方法都是会响应中断的,
68+
* 即:Object.wait()、Thread.join、Thread.sleep、LockSupport.park的有参和无参方法。
69+
* <p>
70+
* 不响应中断的方法:线程进入阻塞状态后,是不响应中断的,等待进入synchronized的方法或是代码块,都是会被阻塞的,此时不会响应中断,
71+
* 另外还有一个不响应中断的,那就是阻塞在ReentrantLock.lock方法里面的线程,也是不响应中断的,
72+
* 如果想要响应中断,可以使用ReentrantLock.lockInterruptibly方法。
73+
*/
74+
@Test
75+
public void testBlockInterrupt() throws Exception {
76+
TimeUnit.SECONDS.sleep(6);
77+
log.info("start");
78+
Object lock = new Object();
79+
80+
Runnable taskHandler = () -> {
81+
synchronized (lock) {
82+
try {
83+
log.info("start wait");
84+
lock.wait();
85+
log.info("get lock");
86+
TimeUnit.SECONDS.sleep(1);
87+
} catch (InterruptedException e) {
88+
log.error(e.getMessage(), e);
89+
}
90+
log.info("finish");
91+
}
92+
};
93+
Thread a = new Thread(taskHandler);
94+
Thread b = new Thread(taskHandler);
95+
Thread c = new Thread(taskHandler);
96+
97+
a.start();
98+
b.start();
99+
c.start();
100+
101+
TimeUnit.SECONDS.sleep(1);
102+
new Thread(() -> {
103+
synchronized (lock) {
104+
try {
105+
log.info("hold lock. start notify");
106+
TimeUnit.MILLISECONDS.sleep(200);
107+
lock.notifyAll();
108+
log.info("finish notify");
109+
} catch (Exception e) {
110+
log.error("", e);
111+
}
112+
}
113+
synchronized (lock) {
114+
log.info("try lock");
115+
}
116+
}).start();
117+
118+
Thread kill = new Thread(() -> {
119+
try {
120+
TimeUnit.MILLISECONDS.sleep(500);
121+
// 由于中断了所有线程,包括了处于 timed_waiting 的线程也被中断,锁也因此释放,blocked的线程也因此拿到了锁后执行sleep方法抛出中断异常
122+
// 时间间隔短没法明确感受到 blocked 线程在发出中断信号的那一刻没作出响应
123+
a.interrupt();
124+
b.interrupt();
125+
c.interrupt();
126+
log.info("finish all interrupt");
127+
} catch (Exception e) {
128+
log.error("", e);
129+
}
130+
});
131+
kill.start();
132+
133+
Thread.currentThread().join(10000);
134+
log.info("exit");
135+
}
136+
137+
@Test
138+
public void testBlockInterrupt2() throws Exception {
139+
TimeUnit.SECONDS.sleep(6);
140+
log.info("start");
141+
Object lock = new Object();
142+
143+
BlockMarkWait a = new BlockMarkWait(lock);
144+
BlockMarkWait b = new BlockMarkWait(lock);
145+
BlockMarkWait c = new BlockMarkWait(lock);
146+
147+
a.start();
148+
b.start();
149+
c.start();
150+
151+
TimeUnit.SECONDS.sleep(1);
152+
new Thread(() -> {
153+
synchronized (lock) {
154+
try {
155+
log.info("hold lock. start notify");
156+
TimeUnit.MILLISECONDS.sleep(200);
157+
lock.notifyAll();
158+
log.info("finish notify");
159+
} catch (Exception e) {
160+
log.error("", e);
161+
}
162+
}
163+
synchronized (lock) {
164+
log.info("try lock");
165+
}
166+
}).start();
167+
168+
Thread kill = new Thread(() -> {
169+
try {
170+
TimeUnit.MILLISECONDS.sleep(500);
171+
// 加上判断后,只中断blocked的线程,timed_waiting线程不动,就能看到在interrupt的那一刻,blocked线程都是没反应的
172+
// 只有等waiting线程释放锁后(间隔700ms左右 1000-(500-200))blocked线程获得锁 执行sleep方法时才报错。
173+
a.tryInterrupt();
174+
b.tryInterrupt();
175+
c.tryInterrupt();
176+
log.info("finish all interrupt");
177+
} catch (Exception e) {
178+
log.error("", e);
179+
}
180+
});
181+
kill.start();
182+
183+
Thread.currentThread().join(10000);
184+
log.info("exit");
185+
}
24186
}

0 commit comments

Comments
 (0)