线程通信——通知与等待
# 线程通信-通知与等待
我们知道 在java中 线程的状态 有6种,初始(NEW),运行(runnable),阻塞(bloked),等待(waiting),超时等待(timed_waiting),终止(terminated) Object.wait() 使线程等待,释放锁,Object.notify(),Object.notifyAll() 唤醒等待的线程
# 那为什么要增加使线程等待,唤醒线程操作吗?
我的理解是在多线程中,可以更好的分配资源。就打个比方,我们有个业务A,业务A里面有很多小操作a,b,c等,这些小操作都是需要获取锁操作,这是有线程1过来执行业务A,他先获取到了a的锁,想继续往下获取b的锁,发现b的锁已经被被别的线程获取到了,如果都是使用的synchronized
关键字加锁的话,线程A会阻塞在这,如果一直获取不到b的锁,那么线程A也会一直阻塞在这(可能发生死锁了),这是我们大家都不愿意看到的。这样就需要线程之间互相协作,商量好哪个先获取锁哪个后获取锁。
# 利用notify和await实现阻塞队列(消费队列中的资源)
/**
* @author ggBall
* @version 1.0.0
* @ClassName BlockedQueue.java
* @Description 利用 notify 和 await 实现阻赛队列
* @createTime 2022年07月05日 11:40:00
*/
public class BlockedQueue<T> {
private final Lock lock = new ReentrantLock();
/**
* 队列不满条件
*/
private final Condition notFull = lock.newCondition();
/**
* 队列不为空条件
*/
private final Condition notEmpty = lock.newCondition();
private final Object[] items;
private int putIndex;
public BlockedQueue(Object[] items) {
this.items = items;
this.putIndex = 0;
}
public BlockedQueue(int size) {
this.items = new Object[size];
this.putIndex = 0;
}
public void enqueue(T t) {
lock.lock();
try {
// 队列已满
while (putIndex > (items.length-1)) {
notFull.await();
}
System.out.println(t+"入队");
items[putIndex] = t;
notEmpty.signal();
putIndex++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
public void dequeue(){
lock.lock();
try {
// 队列为空
while (items.length == 0) {
notEmpty.await();
}
System.out.println(Thread.currentThread().getName()+"出队");
items[--putIndex] = null;
// await() 和前面我们提到的 wait() 语义是一样的;signal() 和前面我们提到的 notify() 语义是一样的。
notFull.signal();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
public int actualSize() {
return putIndex;
}
public static void main(String[] args) throws InterruptedException {
BlockedQueue<String> queue = new BlockedQueue<String>(5);
for (int i = 0; i < 7; i++) {
new Thread(() -> {
queue.enqueue(Thread.currentThread().getName());
},"t"+i).start();
}
Thread.sleep(1000);
while (true) {
if (queue.actualSize() == 0) {
System.out.println("完全出队");
break;
}
try {
Thread.sleep(2000);
queue.dequeue();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
针对队列已满
,队列为空
条件控制线程的等待和运行,队列为空
线程不能再进行消费,队列已满
线程无法往其中发送消息。
# 利用await和signal 将异步转化为同步
/**
* @author ggBall
* @version 1.0.0
* @ClassName ConditionDemo.java
* @Description 异步转同步
* @createTime 2022年07月08日 15:18:00
*/
public class ConditionDemo {
private final Lock lock = new ReentrantLock();
private final Condition don = lock.newCondition();
public static void main(String[] args) {
// B异步给A发送消息,B等待 A收到后,处理一段时间会往C处回调,B去C处获取消息
ConditionDemo conditionDemo = new ConditionDemo();
conditionDemo.testAsyncToSync();
}
public void testAsyncToSync() {
Receiveer receiveer = new Receiveer();
// 发送消息
Thread thread = new Thread(new Msger("test", receiveer));
thread.start();
// 获取消息
String message = receiveer.getMessage();
System.out.println("message = " + message);
}
class Msger implements Runnable {
private String message;
private Receiveer receiveer;
Msger(String message,Receiveer receiveer) {
this.message = message;
this.receiveer = receiveer;
}
public void send() {
System.out.println("开始发送消息");
receiveer.setMessage(message);
}
@Override
public void run() {
send();
}
}
class Receiveer {
private String message;
public void setMessage(String message) {
try {
lock.lock();
Thread.sleep(3000);
this.message = message;
don.signal();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
public String getMessage() {
lock.lock();
try {
if (!isDone()) {
System.out.println("开始等待");
don.await();
System.out.println("结束等待");
}
return message;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
public boolean isDone() {
return "".equals(message);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
结果
流程是 发送消息是异步操作,获取消息会使线程等待,直到消息送达之后,才唤醒等待线程。
场景:目前知名的 RPC 框架 Dubbo 就给我们做了异步转同步的事情
# 引用
如何避免重复创建线程?ubbo如何用管程实现异步转同步 (opens new window) 用“等待-通知”机制优化循环等待 (opens new window)
上次更新: 2022/08/23, 12:44:32