后端 · 2025年 1月 4日 0

java多线程同步

线程同步是个很困难的实现,因为并行涉及到不同资源对同一个资源的访问竞争,同一个资源在不同进程中的缓存速度影响,不同线程对抢夺资源的安排,等待,唤醒,顺序,等等等,属实需要大篇来讲解。

第一步:synchronized关键字和volatile关键字

synchronized 是 Java 中用于实现线程同步的关键字。它确保在多线程环境下对共享资源的访问是安全的,避免了数据不一致的问题。以下是 synchronized 的详细解析:

1. 基本概念和原理

  • 同步方法:通过在方法声明中使用 synchronized 关键字来实现。因为该关键字是java语言层面的实现所以不用捕获异常。也不用关心底层实现。
  • 同步代码块:通过在代码块中使用 synchronized 关键字来实现。理解synchronized块的工作原理对于确保线程安全至关重要。synchronized块的主要作用是互斥访问 ,即在同一时间只允许一个线程进入临界区(critical section)
  • 锁对象synchronized 关键字需要一个对象作为锁,这个对象可以是任何对象,但通常是当前对象(this)或某个特定的对象。
  • 互斥 (Mutual Exclusion)是指在同一时间只有一个线程能够访问某个资源或执行某个代码段。互斥的主要目的是防止多个线程同时修改共享数据,从而避免数据竞争(race condition)和不一致的问题。

确实,volatile关键字在Java中用于解决共享变量在线程间的可见性问题。下面是对volatile关键字的详细解释,包括其工作原理、适用场景以及与synchronized的区别。

什么是volatile关键字?

volatile关键字是一个特殊的修饰符,可以应用于变量上。当一个变量被声明为volatile时,它具有以下特性:

  1. 可见性:确保一个线程对volatile变量的修改对其他线程是立即可见的。
  2. 禁止指令重排序:防止编译器和处理器对volatile变量的读写操作进行重排序。

工作原理

可见性

在多线程环境下,每个线程都有自己的工作内存(也称为线程栈),当线程访问一个变量时,通常是从自己的工作内存中读取。而工作内存中的数据是从主内存(也称为堆内存)中复制过来的。如果没有特殊措施,一个线程对共享变量的修改不会立即反映到其他线程的工作内存中。

volatile关键字通过确保对volatile变量的读写操作直接在主内存中进行,从而保证了变量的可见性。具体来说:

  • 当一个线程修改了volatile变量的值时,这个修改会立即刷新到主内存。
  • 当其他线程需要读取这个volatile变量时,它们会从主内存中读取最新的值,而不是从各自的工作内存中读取旧值。

禁止指令重排序

编译器和处理器为了优化性能,可能会对代码进行指令重排序。例如,编译器可能会将两个不相关的操作交换顺序,以提高指令流水线的效率。然而,这种重排序可能会导致多线程环境下出现不可预测的行为。

volatile关键字通过禁止指令重排序来保证以下两点:

  • volatile变量的写操作不会被重排序到写操作之前。
  • volatile变量的读操作不会被重排序到读操作之后。

这确保了对volatile变量的操作按照程序中指定的顺序进行,避免了由于重排序导致的不一致性问题。

适用场景

volatile关键字适用于以下场景:

  1. 状态标志
    • 用于表示某个状态或标志位,例如线程是否应该继续运行。
    volatile boolean running = true;
  2. 单例模式
    • 在双重检查锁定(Double-Checked Locking)模式中,用于确保实例的可见性和一致性。
    private static volatile Singleton instance; public static Singleton getInstance() { if (instance == null) { synchronized (Singleton.class) { if (instance == null) { instance = new Singleton(); } } } return instance; }
  3. 配置参数
    • 用于存储配置参数,这些参数可能会在运行时被外部系统修改。
      java volatile int configValue;

synchronized的区别

虽然volatilesynchronized都可以用于解决多线程环境下的并发问题,但它们的作用和使用场景有所不同:

  1. volatile:
    • 可见性:确保变量的修改对所有线程立即可见。
    • 原子性:仅适用于单个变量的读写操作,不能保证复合操作的原子性。
    • 性能:开销较小,适用于对性能要求较高的场景。
    • 适用场景:适用于状态标志、配置参数等简单场景。
  2. synchronized:
    • 可见性:确保变量的修改对所有线程立即可见。
    • 原子性:保证复合操作的原子性,可以确保多个操作作为一个整体被执行。
    • 性能:开销较大,适用于对数据一致性要求较高的场景。
    • 适用场景:适用于需要保证复合操作原子性的复杂场景。

示例代码

以下是一个简单的示例,展示了volatile关键字的使用:

public class VolatileExample {
    private static volatile boolean running = true;

    public static void main(String[] args) throws InterruptedException {
        Thread workerThread = new Thread(() -> {
            int i = 0;
            while (running) {
                i++;
                System.out.println(i);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("Worker thread stopped.");
        });

        workerThread.start();

        Thread.sleep(2000); // 让workerThread运行2秒
        running = false; // 修改volatile变量
        System.out.println("Main thread set running to false.");
    }
}

在这个示例中,running变量被声明为volatile,确保主线程对running变量的修改能够立即对workerThread可见,从而使得workerThread能够及时停止运行。

总结

  • 可见性volatile确保对变量的修改对所有线程立即可见。
  • 禁止指令重排序volatile禁止对变量的读写操作进行重排序。
  • 适用场景:适用于状态标志、配置参数等简单场景。
  • synchronized的区别volatile仅保证可见性和禁止重排序,不保证复合操作的原子性;synchronized保证可见性、原子性和复合操作的顺序性。

2. 同步方法

让线程自己选择锁对象往往会使得代码逻辑混乱,也不利于封装。更好的方法是把synchronized逻辑封装起来。例如,我们编写一个计数器如下:

public class Counter {
    private int count = 0;

    public void add(int n) {
        synchronized(this) {
            count += n;
        }
    }

    public void dec(int n) {
        synchronized(this) {
            count -= n;
        }
    }

    public int get() {
        return count;
    }
}

这样一来,线程调用add()dec()方法时,它不必关心同步逻辑,因为synchronized代码块在add()dec()方法内部。并且,我们注意到,synchronized锁住的对象是this,即当前实例,这又使得创建多个Counter实例的时候,它们之间互不影响,可以并发执行:

var c1 = Counter();
var c2 = Counter();

// 对c1进行操作的线程:
new Thread(() -> {
    c1.add();
}).start();
new Thread(() -> {
    c1.dec();
}).start();

// 对c2进行操作的线程:
new Thread(() -> {
    c2.add();
}).start();
new Thread(() -> {
    c2.dec();
}).start();

现在,对于Counter类,多线程可以正确调用。

如果一个类被设计为允许多线程正确访问,我们就说这个类就是“线程安全”的(thread-safe),上面的Counter类就是线程安全的。Java标准库的java.lang.StringBuffer也是线程安全的。

还有一些不变类,例如StringIntegerLocalDate,它们的所有成员变量都是final,多线程同时访问时只能读不能写,这些不变类也是线程安全的。

最后,类似Math这些只提供静态方法,没有成员变量的类,也是线程安全的。

除了上述几种少数情况,大部分类,例如ArrayList,都是非线程安全的类,我们不能在多线程中修改它们。但是,如果所有线程都只读取,不写入,那么ArrayList是可以安全地在线程间共享的。

详解一下final对象为什么是线程安全的。

一些人可能会疑惑:String对象是线程安全的,因为是final的,对String对象更改不是更改String,而是新建一个String对象,然后指向他,这些对象存储在线程池中,但是同时多个线程同时对String s = “hello”的s修改,也不能保证线程安全,为什么会说这类final对象是线程安全的呢?

答:线程安全指的是String s = “hello”中的hello而不是s,安全的是hello而不是指向hello的引用s,引用不是线程安全的。

Java的线程锁是可重入的锁。

什么是可重入的锁?我们还是来看例子:

public class Counter {
    private int count = 0;

    public synchronized void add(int n) {
        if (n < 0) {
            dec(-n);
        } else {
            count += n;
        }
    }

    public synchronized void dec(int n) {
        count += n;
    }
}

观察synchronized修饰的add()方法,一旦线程执行到add()方法内部,说明它已经获取了当前实例的this锁。如果传入的n < 0,将在add()方法内部调用dec()方法。由于dec()方法也需要获取this锁,现在问题来了:

对同一个线程,能否在获取到锁以后继续获取同一个锁?

答案是肯定的。JVM允许同一个线程重复获取同一个锁,这种能被同一个线程反复获取的锁,就叫做可重入锁

由于Java的线程锁是可重入锁,所以,获取锁的时候,不但要判断是否是第一次获取,还要记录这是第几次获取。每获取一次锁,记录+1,每退出synchronized块,记录-1,减到0的时候,才会真正释放锁。

第二步:理解和避免死锁

死锁

一个线程可以获取一个锁后,再继续获取另一个锁。例如:

public void add(int m) {
    synchronized(lockA) { // 获得lockA的锁
        this.value += m;
        synchronized(lockB) { // 获得lockB的锁
            this.another += m;
        } // 释放lockB的锁
    } // 释放lockA的锁
}

public void dec(int m) {
    synchronized(lockB) { // 获得lockB的锁
        this.another -= m;
        synchronized(lockA) { // 获得lockA的锁
            this.value -= m;
        } // 释放lockA的锁
    } // 释放lockB的锁
}

在获取多个锁的时候,不同线程获取多个不同对象的锁可能导致死锁。对于上述代码,线程1和线程2如果分别执行add()dec()方法时:

  • 线程1:进入add(),获得lockA
  • 线程2:进入dec(),获得lockB

随后:

  • 线程1:准备获得lockB,失败,等待中;
  • 线程2:准备获得lockA,失败,等待中。

此时,两个线程各自持有不同的锁,然后各自试图获取对方手里的锁,造成了双方无限等待下去,这就是死锁。

死锁发生后,没有任何机制能解除死锁,只能强制结束JVM进程。

因此,在编写多线程应用时,要特别注意防止死锁。因为死锁一旦形成,就只能强制结束进程。

那么我们应该如何避免死锁呢?答案是:线程获取锁的顺序要一致。即严格按照先获取lockA,再获取lockB的顺序,改写dec()方法如下:

public void dec(int m) {
    synchronized(lockA) { // 获得lockA的锁
        this.value -= m;
        synchronized(lockB) { // 获得lockB的锁
            this.another -= m;
        } // 释放lockB的锁
    } // 释放lockA的锁
}

第三步:多线程协调

在Java程序中,synchronized解决了多线程竞争的问题。例如,对于一个任务管理器,多个线程同时往队列中添加任务,可以用synchronized加锁:

class TaskQueue {
    Queue<String> queue = new LinkedList<>();

    public synchronized void addTask(String s) {
        this.queue.add(s);
    }
}

但是synchronized并没有解决多线程协调的问题。

仍然以上面的TaskQueue为例,我们再编写一个getTask()方法取出队列的第一个任务:

class TaskQueue {
    Queue<String> queue = new LinkedList<>();

    public synchronized void addTask(String s) {
        this.queue.add(s);
    }

    public synchronized String getTask() {
        while (queue.isEmpty()) {
        }
        return queue.remove();
    }
}

上述代码看上去没有问题:getTask()内部先判断队列是否为空,如果为空,就循环等待,直到另一个线程往队列中放入了一个任务,while()循环退出,就可以返回队列的元素了。

但实际上while()循环永远不会退出。因为线程在执行while()循环时,已经在getTask()入口获取了this锁,其他线程根本无法调用addTask(),因为addTask()执行条件也是获取this锁。

因此,执行上述代码,线程会在getTask()中因为死循环而100%占用CPU资源。

如果深入思考一下,我们想要的执行效果是:

  • 线程1可以调用addTask()不断往队列中添加任务;
  • 线程2可以调用getTask()从队列中获取任务。如果队列为空,则getTask()应该等待,直到队列中至少有一个任务时再返回。

因此,多线程协调运行的原则就是:当条件不满足时,线程进入等待状态;当条件满足时,线程被唤醒,继续执行任务。

对于上述TaskQueue,我们先改造getTask()方法,在条件不满足时,线程进入等待状态:

public synchronized String getTask() {
    while (queue.isEmpty()) {
        this.wait();
    }
    return queue.remove();
}

当一个线程执行到getTask()方法内部的while循环时,它必定已经获取到了this锁,此时,线程执行while条件判断,如果条件成立(队列为空),线程将执行this.wait(),进入等待状态。

这里的关键是:wait()方法必须在当前获取的锁对象上调用,这里获取的是this锁,因此调用this.wait()

调用wait()方法后,线程进入等待状态,wait()方法不会返回,直到将来某个时刻,线程从等待状态被其他线程唤醒后,wait()方法才会返回,然后,继续执行下一条语句。

有些仔细的童鞋会指出:即使线程在getTask()内部等待,其他线程如果拿不到this锁,照样无法执行addTask(),肿么办?

这个问题的关键就在于wait()方法的执行机制非常复杂。首先,它不是一个普通的Java方法,而是定义在Object类的一个native方法,也就是由JVM的C代码实现的。其次,必须在synchronized块中才能调用wait()方法,因为wait()方法调用时,会释放线程获得的锁,wait()方法返回时,线程又会重新试图获得锁。

因此,只能在锁对象上调用wait()方法。因为在getTask()中,我们获得了this锁,因此,只能在this对象上调用wait()方法:

public synchronized String getTask() {
    while (queue.isEmpty()) {
        // 释放this锁:
        this.wait();
        // 重新获取this锁
    }
    return queue.remove();
}

当一个线程在this.wait()等待时,它就会释放this锁,从而使得其他线程能够在addTask()方法获得this锁。

现在我们面临第二个问题:如何让等待的线程被重新唤醒,然后从wait()方法返回?答案是在相同的锁对象上调用notify()方法。我们修改addTask()如下:

public synchronized void addTask(String s) {
    this.queue.add(s);
    this.notify(); // 唤醒在this锁等待的线程
}

注意到在往队列中添加了任务后,线程立刻对this锁对象调用notify()方法,这个方法会唤醒一个正在this锁等待的线程(就是在getTask()中位于this.wait()的线程),从而使得等待线程从this.wait()方法返回。

我们来看一个完整的例子:

import java.util.*;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        var q = new TaskQueue();
        var ts = new ArrayList<Thread>();
        for (int i=0; i<5; i++) {
            var t = new Thread() {
                public void run() {
                    // 执行task:
                    while (true) {
                        try {
                            String s = q.getTask();
                            System.out.println("execute task: " + s);
                        } catch (InterruptedException e) {
                            return;
                        }
                    }
                }
            };
            t.start();
            ts.add(t);
        }
        var add = new Thread(() -> {
            for (int i=0; i<10; i++) {
                // 放入task:
                String s = "t-" + Math.random();
                System.out.println("add task: " + s);
                q.addTask(s);
                try { Thread.sleep(100); } catch(InterruptedException e) {}
            }
        });
        add.start();
        add.join();
        Thread.sleep(100);
        for (var t : ts) {
            t.interrupt();
        }
    }
}

class TaskQueue {
    Queue<String> queue = new LinkedList<>();

    public synchronized void addTask(String s) {
        this.queue.add(s);
        this.notifyAll();
    }

    public synchronized String getTask() throws InterruptedException {
        while (queue.isEmpty()) {
            this.wait();
        }
        return queue.remove();
    }
}

这个例子中,我们重点关注addTask()方法,内部调用了this.notifyAll()而不是this.notify(),使用notifyAll()将唤醒所有当前正在this锁等待的线程,而notify()只会唤醒其中一个(具体哪个依赖操作系统,有一定的随机性)。这是因为可能有多个线程正在getTask()方法内部的wait()中等待,使用notifyAll()将一次性全部唤醒。通常来说,notifyAll()更安全。有些时候,如果我们的代码逻辑考虑不周,用notify()会导致只唤醒了一个线程,而其他线程可能永远等待下去醒不过来了。

但是,注意到wait()方法返回时需要重新获得this锁。假设当前有3个线程被唤醒,唤醒后,首先要等待执行addTask()的线程结束此方法后,才能释放this锁,随后,这3个线程中只能有一个获取到this锁,剩下两个将继续等待。

再注意到我们在while()循环中调用wait(),而不是if语句:

public synchronized String getTask() throws InterruptedException {
    if (queue.isEmpty()) {
        this.wait();
    }
    return queue.remove();
}

这种写法实际上是错误的,因为线程被唤醒时,需要再次获取this锁。多个线程被唤醒后,只有一个线程能获取this锁,此刻,该线程执行queue.remove()可以获取到队列的元素,然而,剩下的线程如果获取this锁后执行queue.remove(),此刻队列可能已经没有任何元素了,所以,要始终在while循环中wait(),并且每次被唤醒后拿到this锁就必须再次判断:

while (queue.isEmpty()) {
    this.wait();
}

从Java 5开始,引入了一个高级的处理并发的java.util.concurrent包,它提供了大量更高级的并发功能,能大大简化多线程程序的编写。

我们知道Java语言直接提供了synchronized关键字用于加锁,但这种锁一是很重,二是获取时必须一直等待,没有额外的尝试机制。

java.util.concurrent.locks包提供的ReentrantLock用于替代synchronized加锁,我们来看一下传统的synchronized代码:

public class Counter {
    private int count;

    public void add(int n) {
        synchronized(this) {
            count += n;
        }
    }
}

如果用ReentrantLock替代,可以把代码改造为:

public class Counter {
    private final Lock lock = new ReentrantLock();
    private int count;

    public void add(int n) {
        lock.lock();
        try {
            count += n;
        } finally {
            lock.unlock();
        }
    }
}

因为synchronized是Java语言层面提供的语法,所以我们不需要考虑异常,而ReentrantLock是Java代码实现的锁,我们就必须先获取锁,然后在finally中正确释放锁。

顾名思义,ReentrantLock是可重入锁,它和synchronized一样,一个线程可以多次获取同一个锁。

synchronized不同的是,ReentrantLock可以尝试获取锁:

if (lock.tryLock(1, TimeUnit.SECONDS)) {
    try {
        ...
    } finally {
        lock.unlock();
    }
}

上述代码在尝试获取锁的时候,最多等待1秒。如果1秒后仍未获取到锁,tryLock()返回false,程序就可以做一些额外处理,而不是无限等待下去。

所以,使用ReentrantLock比直接使用synchronized更安全,线程在tryLock()失败的时候不会导致死锁。

使用ReentrantLock比直接使用synchronized更安全,可以替代synchronized进行线程同步。

但是,synchronized可以配合waitnotify实现线程在条件不满足时等待,条件满足时唤醒,用ReentrantLock我们怎么编写waitnotify的功能呢?

答案是使用Condition对象来实现waitnotify的功能。

我们仍然以TaskQueue为例,把前面用synchronized实现的功能通过ReentrantLockCondition来实现:

class TaskQueue {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private Queue<String> queue = new LinkedList<>();

    public void addTask(String s) {
        lock.lock();
        try {
            queue.add(s);
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public String getTask() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                condition.await();
            }
            return queue.remove();
        } finally {
            lock.unlock();
        }
    }
}

可见,使用Condition时,引用的Condition对象必须从Lock实例的newCondition()返回,这样才能获得一个绑定了Lock实例的Condition实例。

Condition提供的await()signal()signalAll()原理和synchronized锁对象的wait()notify()notifyAll()是一致的,并且其行为也是一样的:

  • await()会释放当前锁,进入等待状态;
  • signal()会唤醒某个等待线程;
  • signalAll()会唤醒所有等待线程;
  • 唤醒线程从await()返回后需要重新获得锁。

此外,和tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()signalAll()唤醒,可以自己醒来:

if (condition.await(1, TimeUnit.SECOND)) {
    // 被其他线程唤醒
} else {
    // 指定时间内没有被其他线程唤醒
}

可见,使用Condition配合Lock,我们可以实现更灵活的线程同步。

第四步:多线程效率

前面讲到的ReentrantLock保证了只有一个线程可以执行临界区代码:

public class Counter {
    private final Lock lock = new ReentrantLock();
    private int[] counts = new int[10];

    public void inc(int index) {
        lock.lock();
        try {
            counts[index] += 1;
        } finally {
            lock.unlock();
        }
    }

    public int[] get() {
        lock.lock();
        try {
            return Arrays.copyOf(counts, counts.length);
        } finally {
            lock.unlock();
        }
    }
}

但是有些时候,这种保护有点过头。因为我们发现,任何时刻,只允许一个线程修改,也就是调用inc()方法是必须获取锁,但是,get()方法只读取数据,不修改数据,它实际上允许多个线程同时调用。

实际上我们想要的是:允许多个线程同时读,但只要有一个线程在写,其他线程就必须等待:

允许不允许
不允许不允许

使用ReadWriteLock可以解决这个问题,它保证:

  • 只允许一个线程写入(其他线程既不能写入也不能读取);
  • 没有写入时,多个线程允许同时读(提高性能)。

ReadWriteLock实现这个功能十分容易。我们需要创建一个ReadWriteLock实例,然后分别获取读锁和写锁:

public class Counter {
    private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
    // 注意: 一对读锁和写锁必须从同一个rwlock获取:
    private final Lock rlock = rwlock.readLock();
    private final Lock wlock = rwlock.writeLock();
    private int[] counts = new int[10];

    public void inc(int index) {
        wlock.lock(); // 加写锁
        try {
            counts[index] += 1;
        } finally {
            wlock.unlock(); // 释放写锁
        }
    }

    public int[] get() {
        rlock.lock(); // 加读锁
        try {
            return Arrays.copyOf(counts, counts.length);
        } finally {
            rlock.unlock(); // 释放读锁
        }
    }
}

把读写操作分别用读锁和写锁来加锁,在读取时,多个线程可以同时获得读锁,这样就大大提高了并发读的执行效率。

使用ReadWriteLock时,适用条件是同一个数据,有大量线程读取,但仅有少数线程修改。

例如,一个论坛的帖子,回复可以看做写入操作,它是不频繁的,但是,浏览可以看做读取操作,是非常频繁的,这种情况就可以使用ReadWriteLock

前面介绍的ReadWriteLock可以解决多线程同时读,但只有一个线程能写的问题。

如果我们深入分析ReadWriteLock,会发现它有个潜在的问题:如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁。

要进一步提升并发执行效率,Java 8引入了新的读写锁:StampedLock

StampedLockReadWriteLock相比,改进之处在于:读的过程中也允许获取写锁后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。

乐观锁的意思就是乐观地估计读的过程中大概率不会有写入,因此被称为乐观锁。反过来,悲观锁则是读的过程中拒绝有写入,也就是写入必须等待。显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。

我们来看例子:

public class Point {
    private final StampedLock stampedLock = new StampedLock();

    private double x;
    private double y;

    public void move(double deltaX, double deltaY) {
        long stamp = stampedLock.writeLock(); // 获取写锁
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            stampedLock.unlockWrite(stamp); // 释放写锁
        }
    }

    public double distanceFromOrigin() {
        long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁
        // 注意下面两行代码不是原子操作
        // 假设x,y = (100,200)
        double currentX = x;
        // 此处已读取到x=100,但x,y可能被写线程修改为(300,400)
        double currentY = y;
        // 此处已读取到y,如果没有写入,读取是正确的(100,200)
        // 如果有写入,读取是错误的(100,400)
        if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生
            stamp = stampedLock.readLock(); // 获取一个悲观读锁
            try {
                currentX = x;
                currentY = y;
            } finally {
                stampedLock.unlockRead(stamp); // 释放悲观读锁
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
}

ReadWriteLock相比,写入的加锁是完全一样的,不同的是读取。注意到首先我们通过tryOptimisticRead()获取一个乐观读锁,并返回版本号。接着进行读取,读取完成后,我们通过validate()去验证版本号,如果在读取过程中没有写入,版本号不变,验证成功,我们就可以放心地继续后续操作。如果在读取过程中有写入,版本号会发生变化,验证将失败。在失败的时候,我们再通过获取悲观读锁再次读取。由于写入的概率不高,程序在绝大部分情况下可以通过乐观读锁获取数据,极少数情况下使用悲观读锁获取数据。

可见,StampedLock把读锁细分为乐观读和悲观读,能进一步提升并发效率。但这也是有代价的:一是代码更加复杂,二是StampedLock是不可重入锁,不能在一个线程中反复获取同一个锁。

StampedLock还提供了更复杂的将悲观读锁升级为写锁的功能,它主要使用在if-then-update的场景:即先读,如果读的数据满足条件,就返回,如果读的数据不满足条件,再尝试写。

第五步:多线程线程数限制

前面我们讲了各种锁的实现,本质上锁的目的是保护一种受限资源,保证同一时刻只有一个线程能访问(ReentrantLock),或者只有一个线程能写入(ReadWriteLock)。

还有一种受限资源,它需要保证同一时刻最多有N个线程能访问,比如同一时刻最多创建100个数据库连接,最多允许10个用户下载等。

这种限制数量的锁,如果用Lock数组来实现,就太麻烦了。

这种情况就可以使用Semaphore,例如,最多允许3个线程同时访问:

public class AccessLimitControl {
    // 任意时刻仅允许最多3个线程获取许可:
    final Semaphore semaphore = new Semaphore(3);

    public String access() throws Exception {
        // 如果超过了许可数量,其他线程将在此等待:
        semaphore.acquire();
        try {
            // TODO:
            return UUID.randomUUID().toString();
        } finally {
            semaphore.release();
        }
    }
}

使用Semaphore先调用acquire()获取,然后通过try ... finally保证在finally中释放。

调用acquire()可能会进入等待,直到满足条件为止。也可以使用tryAcquire()指定等待时间:

if (semaphore.tryAcquire(3, TimeUnit.SECONDS)) {
    // 指定等待时间3秒内获取到许可:
    try {
        // TODO:
    } finally {
        semaphore.release();
    }
}

Semaphore本质上就是一个信号计数器,用于限制同一时间的最大访问数量。

第六步:多线程实用类和原子操作

我们在前面已经通过ReentrantLockCondition实现了一个BlockingQueue

public class TaskQueue {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private Queue<String> queue = new LinkedList<>();

    public void addTask(String s) {
        lock.lock();
        try {
            queue.add(s);
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public String getTask() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                condition.await();
            }
            return queue.remove();
        } finally {
            lock.unlock();
        }
    }
}

BlockingQueue的意思就是说,当一个线程调用这个TaskQueuegetTask()方法时,该方法内部可能会让线程变成等待状态,直到队列条件满足不为空,线程被唤醒后,getTask()方法才会返回。

因为BlockingQueue非常有用,所以我们不必自己编写,可以直接使用Java标准库的java.util.concurrent包提供的线程安全的集合:ArrayBlockingQueue

除了BlockingQueue外,针对ListMapSetDeque等,java.util.concurrent包也提供了对应的并发集合类。我们归纳一下:

interfacenon-thread-safethread-safe
ListArrayListCopyOnWriteArrayList
MapHashMapConcurrentHashMap
SetHashSet / TreeSetCopyOnWriteArraySet
QueueArrayDeque / LinkedListArrayBlockingQueue / LinkedBlockingQueue
DequeArrayDeque / LinkedListLinkedBlockingDeque

使用这些并发集合与使用非线程安全的集合类完全相同。我们以ConcurrentHashMap为例:

Map<String, String> map = new ConcurrentHashMap<>();
// 在不同的线程读写:
map.put("A", "1");
map.put("B", "2");
map.get("A", "1");

因为所有的同步和加锁的逻辑都在集合内部实现,对外部调用者来说,只需要正常按接口引用,其他代码和原来的非线程安全代码完全一样。即当我们需要多线程访问时,把:

Map<String, String> map = new HashMap<>();

改为:

Map<String, String> map = new ConcurrentHashMap<>();

就可以了。

java.util.Collections工具类还提供了一个旧的线程安全集合转换器,可以这么用:

Map unsafeMap = new HashMap();
Map threadSafeMap = Collections.synchronizedMap(unsafeMap);

但是它实际上是用一个包装类包装了非线程安全的Map,然后对所有读写方法都用synchronized加锁,这样获得的线程安全集合的性能比java.util.concurrent集合要低很多,所以不推荐使用。

线程安全的集合类

  1. java.util.concurrent
    • ConcurrentHashMap :线程安全的哈希表实现,支持高并发访问。
    • CopyOnWriteArrayList :线程安全的列表实现,适用于读多写少的场景。
    • CopyOnWriteArraySet :基于CopyOnWriteArrayList的线程安全集合。
    • ConcurrentLinkedQueue :线程安全的无界非阻塞队列。
    • BlockingQueue接口及其实现
      • ArrayBlockingQueue :有界阻塞队列。
      • LinkedBlockingQueue :可选有界的阻塞队列。
      • PriorityBlockingQueue :无界阻塞优先级队列。
      • DelayQueue :无界阻塞延迟队列。
      • SynchronousQueue :不存储元素的阻塞队列。
    • ConcurrentSkipListMap :线程安全的跳跃表实现,支持高效的并发访问。
    • ConcurrentSkipListSet :基于ConcurrentSkipListMap的线程安全集合。
  2. java.util.Collections工具类
    • Collections.synchronizedList(List<T> list) :返回指定列表的同步(线程安全)包装器。
    • Collections.synchronizedSet(Set<T> s) :返回指定集合的同步(线程安全)包装器。
    • Collections.synchronizedMap(Map<K,V> m) :返回指定映射的同步(线程安全)包装器。
    • Collections.synchronizedSortedSet(SortedSet<T> s) :返回指定排序集合的同步(线程安全)包装器。
    • Collections.synchronizedSortedMap(SortedMap<K,V> m) :返回指定排序映射的同步(线程安全)包装器。

其他线程安全的类

  1. java.lang
    • String :虽然String对象本身是不可变的,但引用是可变的,需要额外的同步机制来确保引用的安全性。
    • StringBuilder StringBuffer
      • StringBuffer :线程安全的可变字符序列。
      • StringBuilder :非线程安全的可变字符序列,性能更高,适用于单线程环境。
  2. java.util.concurrent.atomic
    • AtomicInteger :提供原子操作的整数类。
    • AtomicLong :提供原子操作的长整数类。
    • AtomicBoolean :提供原子操作的布尔类。
    • AtomicReference<V> :提供原子操作的对象引用。
    • AtomicIntegerArray :提供原子操作的整数数组。
    • AtomicLongArray :提供原子操作的长整数数组。
    • AtomicReferenceArray<V> :提供原子操作的对象数组引用。
    • AtomicIntegerFieldUpdater<T> :提供对指定类的volatile int字段的原子更新。
    • AtomicLongFieldUpdater<T> :提供对指定类的volatile long字段的原子更新。
    • AtomicReferenceFieldUpdater<T,V> :提供对指定类的volatile reference字段的原子更新。
  3. java.util.concurrent.locks
    • ReentrantLock :可重入的互斥锁。
    • ReadWriteLock接口及其实现
      • ReentrantReadWriteLock :支持读写锁的实现。
    • Condition接口 :与ReentrantLock一起使用,提供更灵活的线程通信机制。
  4. java.util.concurrent包中的其他类
    • ExecutorService接口及其实现
      • ThreadPoolExecutor :线程池的实现。
      • ScheduledThreadPoolExecutor :支持定时任务的线程池实现。
    • Future接口及其实现
      • FutureTask :表示异步计算的结果。
    • CompletableFuture :提供更强大的异步编程支持。

Java的java.util.concurrent包除了提供底层锁、并发集合外,还提供了一组原子操作的封装类,它们位于java.util.concurrent.atomic包。

我们以AtomicInteger为例,它提供的主要操作有:

  • 增加值并返回新值:int addAndGet(int delta)
  • 加1后返回新值:int incrementAndGet()
  • 获取当前值:int get()
  • 用CAS方式设置:int compareAndSet(int expect, int update)

Atomic类是通过无锁(lock-free)的方式实现的线程安全(thread-safe)访问。它的主要原理是利用了CAS:Compare and Set。

如果我们自己通过CAS编写incrementAndGet(),它大概长这样:

public int incrementAndGet(AtomicInteger var) {
    int prev, next;
    do {
        prev = var.get();
        next = prev + 1;
    } while ( ! var.compareAndSet(prev, next));
    return next;
}

CAS是指,在这个操作中,如果AtomicInteger的当前值是prev,那么就更新为next,返回true。如果AtomicInteger的当前值不是prev,就什么也不干,返回false。通过CAS操作并配合do ... while循环,即使其他线程修改了AtomicInteger的值,最终的结果也是正确的。

我们利用AtomicLong可以编写一个多线程安全的全局唯一ID生成器:

class IdGenerator {
    AtomicLong var = new AtomicLong(0);

    public long getNextId() {
        return var.incrementAndGet();
    }
}

通常情况下,我们并不需要直接用do ... while循环调用compareAndSet实现复杂的并发操作,而是用incrementAndGet()这样的封装好的方法,因此,使用起来非常简单。

在高度竞争的情况下,还可以使用Java 8提供的LongAdderLongAccumulator