Skip to content

线程间的通讯

  • 线程间的通讯是指多个线程之间的信息交换。
  • 线程间的通讯主要有两种方式:共享内存和消息传递。
    • 共享内存是指多个线程共享同一块内存区域,线程之间通过这块内存区域来交换信息。
    • 可以使用 volatile 关键字可以保证可见性,但是不能保证原子性。
      • 共享内存的示例代码:
Java
import java.util.concurrent.atomic.AtomicInteger;

public class SharedMemoryDemo {
    private static final AtomicInteger count = new AtomicInteger(0);
    public static void main(String[] args) {
        Thread thread1 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                count.incrementAndGet();
            }
        });
        Thread thread2 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                count.incrementAndGet();
            }
        });
        thread1.start();
        thread2.start();
        try {
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println(count);
    }
}
  • 消息传递是指线程之间通过消息传递来交换信息,消息传递是一种更加安全的通讯方式,因为消息传递是通过操作系统来实现的,操作系统会保证消息的正确传递。
    • 消息传递的示例代码:
Java
package io.alex.concurrent;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingQueueExample {
    public static void main(String[] args) {
        // 创建一个容量为5的阻塞队列
        BlockingQueue<String> queue = new LinkedBlockingQueue<>(5);

        // 创建生产者和消费者,将阻塞队列传入
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

class Producer implements Runnable {
    private final BlockingQueue<String> queue;

    public Producer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                String message = "Message " + i;
                queue.put(message);
                System.out.println("Produced: " + message);
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable {
    private final BlockingQueue<String> queue;

    public Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                String message = queue.poll();
                System.out.println("Consumed: " + message);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
  • 使用 wait 和 notify
Java
class Message {
    private String msg;

    public Message(String str){
        this.msg = str;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String str) {
        this.msg = str;
    }
}

class WaitNotifyExample {
    public static void main(String[] args) {
        Message msg = new Message("process it");
        Waiter waiter = new Waiter(msg);
        new Thread(waiter, "waiter").start();

        Notifier notifier = new Notifier(msg);
        new Thread(notifier, "notifier").start();

        System.out.println("All the threads are started");
    }
}

class Waiter implements Runnable{
    private Message msg;

    public Waiter(Message m){
        this.msg = m;
    }

    @Override
    public void run() {
        synchronized (msg) {
            try{
                System.out.println("Waiting for notify at time: " + System.currentTimeMillis());
                msg.wait();
            }catch(InterruptedException e){
                e.printStackTrace();
            }
            System.out.println("Waiter thread got notified at time: " + System.currentTimeMillis());
            System.out.println("Processed: " + msg.getMsg());
        }
    }
}

class Notifier implements Runnable {
    private Message msg;

    public Notifier(Message msg) {
        this.msg = msg;
    }

    @Override
    public void run() {
        synchronized (msg) {
            try {
                Thread.sleep(1000);
                msg.setMsg("Notifier work done");
                msg.notify();
                System.out.println("Notifier thread sent notification at time: " + System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}