Appearance
线程间的通讯
- 线程间的通讯是指多个线程之间的信息交换。
- 线程间的通讯主要有两种方式:共享内存和消息传递。
- 共享内存是指多个线程共享同一块内存区域,线程之间通过这块内存区域来交换信息。
- 可以使用
volatile
关键字可以保证可见性,但是不能保证原子性。- 共享内存的示例代码:
Java
import java.util.concurrent.atomic.AtomicInteger;
public class SharedMemoryDemo {
private static final AtomicInteger count = new AtomicInteger(0);
public static void main(String[] args) {
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
count.incrementAndGet();
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
count.incrementAndGet();
}
});
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(count);
}
}
- 消息传递是指线程之间通过消息传递来交换信息,消息传递是一种更加安全的通讯方式,因为消息传递是通过操作系统来实现的,操作系统会保证消息的正确传递。
- 消息传递的示例代码:
Java
package io.alex.concurrent;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueExample {
public static void main(String[] args) {
// 创建一个容量为5的阻塞队列
BlockingQueue<String> queue = new LinkedBlockingQueue<>(5);
// 创建生产者和消费者,将阻塞队列传入
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
}
}
class Producer implements Runnable {
private final BlockingQueue<String> queue;
public Producer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
String message = "Message " + i;
queue.put(message);
System.out.println("Produced: " + message);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
private final BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
String message = queue.poll();
System.out.println("Consumed: " + message);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- 使用 wait 和 notify
Java
class Message {
private String msg;
public Message(String str){
this.msg = str;
}
public String getMsg() {
return msg;
}
public void setMsg(String str) {
this.msg = str;
}
}
class WaitNotifyExample {
public static void main(String[] args) {
Message msg = new Message("process it");
Waiter waiter = new Waiter(msg);
new Thread(waiter, "waiter").start();
Notifier notifier = new Notifier(msg);
new Thread(notifier, "notifier").start();
System.out.println("All the threads are started");
}
}
class Waiter implements Runnable{
private Message msg;
public Waiter(Message m){
this.msg = m;
}
@Override
public void run() {
synchronized (msg) {
try{
System.out.println("Waiting for notify at time: " + System.currentTimeMillis());
msg.wait();
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.println("Waiter thread got notified at time: " + System.currentTimeMillis());
System.out.println("Processed: " + msg.getMsg());
}
}
}
class Notifier implements Runnable {
private Message msg;
public Notifier(Message msg) {
this.msg = msg;
}
@Override
public void run() {
synchronized (msg) {
try {
Thread.sleep(1000);
msg.setMsg("Notifier work done");
msg.notify();
System.out.println("Notifier thread sent notification at time: " + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}