并发Concurrent与并行Parallel的区别?
并发是同一时间段内处理多个任务,但不是同时执行,可以通过时间片轮转在单核CPU上实现,也就是说并发是逻辑上的同时进行。
并行则是真正同时执行多个任务,必须有多核处理器或者多个处理器来支持,并行是物理上的同时进行。
Java内存模型 Java Memory Model
内存模型的结构包括:
- 主内存main memory 所有线程共享的内存区域,存储所有变量的主副本。用于存储实例字段和静态字段,是变量的唯一主副本。
- 工作内存working memory 每个线程私有的内存区域,存储主内存中变量的副本。线程之间互相隔离,负责存储变量的副本,线程直接操作工作内存。
内存的交互操作包括:
- 锁定 lock: 作用于主内存变量,将其标记为线程私有
- 解锁 unlock: 作用于主内存变量,将其释放锁定状态
- 读取 read: 从主内存读取变量到工作内存缓冲区
- 载入 load:将read的值放入工作内存的变量副本
- 使用 use: 将工作内存变量值传递给执行引擎
- 赋值 assign: 将执行引擎的值传递给工作内存变量
- 存储 store: 将工作内存变量值传送到主内存(先存储在工作内存缓冲区)
- 写入 write: 将store的值写入主内存变量
之所以采取这种 read+load和store+write的原子操作组合,是为了避免频繁修改导致的频繁访存,同时也能够很好的提供精确控制,避免死锁,保证一致性。
常见的线程安全问题
竞态条件: 多个线程同时访问和修改共享数据,最终结果依赖于线程执行的时序
数据竞争: 多个线程同时访问同一内存位置,且至少有一个是写操作,没有适当的同步
死锁: 两个或者多个线程互相等待对方释放资源,导致程序永久阻塞
并发编程的三大特性
可见性: 当一个线程修改了共享变量的值,其他线程能够立即可见这一修改
原子性: 一个操作或者一组操作要么全部执行完成,要么全部不完成,不会被其他线程打断
有序性: 程序执行的顺序按照代码的先后顺序执行,但编译器和处理器可能会进行指令重排序优化。
关于线程的创建
| 特性 | Thread 类 | Runnable 接口 | Callable 接口 |
|---|---|---|---|
| 包 | java.lang |
java.lang |
java.util.concurrent |
| 方法 | run() |
run() |
call() |
| 返回值 | 无 (void) |
无 (void) |
有 (泛型类型) |
| 异常 | 不能抛出受检异常 | 不能抛出受检异常 | 可以抛出受检异常 |
| 使用方式 | 继承 Thread 类 | 实现 Runnable 接口 | 实现 Callable 接口 |
| 执行方式 | thread.start() |
new Thread(runnable).start() |
executor.submit(callable) |
| 结果获取 | 无直接方式 | 无直接方式 | 通过 Future 对象获取 |
| 适用场景 | 简单的线程任务 | 任务与执行分离,资源共享 | 需要返回结果、异常处理的复杂任务 |
通常来说,可以使用lambda表达式来简化Runnable和Callable接口创建线程的语法。
public class LambdaThreadExample {
public static void main(String[] args) {
// 使用Lambda表达式创建线程
Thread thread1 = new Thread( () -> {
for (int i = 0; i < 5; i++) {
System.out.println("Lambda线程: " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} );
// 使用方法引用
Thread thread2 = new Thread(LambdaThreadExample::printNumbers);
thread1.start();
thread2.start();
}
public static void printNumbers() {
for (int i = 0; i < 5; i++) {
System.out.println("方法引用线程: " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
关于线程的状态、优先级与守护线程Daemon
线程的状态包括:
- NEW: 线程对象已经创建但尚未调用
Start()方法 - RUNNABLE: 线程正在JVM中执行
- BLOCKED: 线程被阻塞以等待监视器锁
- WAITING: 线程无限期等待另一个线程执行特定操作
- TIMED_WAITING: 线程等待指定的时间后自动返回
- TERMINATED:线程执行完毕或者异常退出
线程的优先级包括:
- MIN_PRIORITY(1)
- NORM_PRIORITY(5)
- MAX_PRIORITY(10)
守护线程是为其他线程提供服务的,必须在.start()之前进行设置.setDaemon(true)。当所有非守护线程结束时,JVM会退出。比如垃圾回收器GC就是典型的守护线程。
关于Synchronized
synchronized是最基本的同步机制,基于监视器锁Monitor Lock实现。每个Java对象都有一个内置的监视器锁,当线程进入到synchronized代码块或者方法时,就会自动获取这个锁。
该关键字保证了同一时刻只有一个线程可以执行被同步的代码。
其机制包括
- 获取锁: 线程进入synchronized代码块时,会尝试获取监视器锁,如果锁可用,则获取锁并继续执行
- 等待锁: 如果监视器锁被其他线程持有,当前线程会被阻塞,进入等待状态,直到锁被释放
- 释放锁: 线程执行完synchronized代码块后,会自动释放锁,唤醒等待的线程
synchronized可以修饰实例方法、静态方法和代码块,修饰的对象不同,获取的锁也不同。
对象锁(实例锁)
每个对象实例都有一个锁,不同对象实例的锁是完全独立的。
// 实例方法同步
public synchronized void instanceMethod() {
// 获取当前对象的锁
}
// 代码块同步
public void method() {
sychronized(this) {
// 获取当前对象的锁
}
}
类锁(静态锁)
每个类的Class对象都有一个锁,因为每个类只有一个Class对象,所以是全局唯一的。
// 静态方法同步
public static synchronized void staticMethod() {
// 获取类的Class对象的锁
}
// Class对象同步
public void method() {
synchronized(MyClass.class) {
// 获取类的Class对象的锁
}
}
⚠️注意
对象锁和类锁是完全独立的,一个线程获取了对象锁,不会阻止其他线程获取类锁。
死锁的条件
- 互斥条件:资源不能被多个线程使用
- 持有和等待:线程持有资源的同时在等待其他资源
- 不可剥夺:资源不能强制从线程手中剥夺
- 循环等待:存在线程资源的循环等待链
死锁预防策略
- 锁排序:所有线程按照相同顺序获得锁
- 锁超时:使用
tryLock()设置获取锁的超时时间 - 死锁检测:定期检测死锁并采取恢复措施
- 避免嵌套锁:避免在持有锁的情况下获取锁
synchronized优化策略
减小锁粒度
// 不推荐 粗粒度锁
synchronized(this) {
update1();
update2();
}
// 细粒度锁
synchronized(lock1) { update1(); }
synchronized(lock2) { update2(); }
读写锁分离
ReadWriteLock rwLock = new ReentrantReadWriteLock();
Lock readLock = rwLock.readLock();
Lock writeLock = rwLock.writeLock();
由于JIT编译器本身的优化,在对象不存在逃逸(不会被其他线程访问)的情况,JVM会消除锁操作。
关于volatile
volatile关键字提供了比synchronized更轻量级的同步机制,主要是解决变量在多线程环境下的可见性问题。
- 可见性保证:当一个线程修改volatile变量时,其他线程都立即可见这个操作
- 禁止重排序:编译器和处理器不会对volatile变量的读写进行重排序,保证程序执行的有序性
- 非原子性:对volatile变量的复合操作不保证原子性,如
i++这样的操作仍然需要额外的同步机制
volatile实现机制
当一个变量被声明为volatile时,JVM会在适当的地方插入内存屏障指令,保证:
- 写操作立即刷新到主内存,并使得其他CPU缓存失效
- 读操作直接从主内存中读取最新值,不适用缓存
- 防止指令重排序
关于内存屏障
内存屏障是一种CPU指令,用于控制特定条件下的内存操作顺序和可见性,包括四种:LoadLoad屏障,LoadStore屏障,StoreLoad屏障(开销最大),StoreStore屏障。其作用可以简略表述为:保证只有屏障前的操作执行完后,才能执行屏障后的操作。
// 写操作的内存屏障
StoreStore
volatile Write
StoreLoad
// 读操作的内存屏障
volatile Read
LoadLoad
LoadStore
关于happens-before规则
我们说操作A happens-before 操作B,意思是操作A的结果对于操作B来说是可见的。这里happens-before原则描述的是内存可见性,而不是时间关系。
对比synchronized与volatile
| volatile | synchronized |
|---|---|
| 轻量级同步机制 | 重量级同步机制 |
| 可见性,有序性 | 可见性,有序性,原子性 |
| 不保证原子性 | 互斥访问 |
| 无阻塞特性 | 可能会导致线程阻塞 |
| 性能开销小 | 性能开销大 |
关于Lock
Lock接口提供了比synchronized关键字更灵活、更强大的锁机制,为synchronized无法满足需求时,提供了更多的选择,其核心方法包括:
public interface Lock {
// 获取锁,如果锁不可用则阻塞等待
void lock();
// 可中断锁获取
void lockInterruptibly() throws InterruptedException;
// 尝试获取锁,立即返回结果
boolean tryLock();
// 在指定时间内尝试获取锁
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 释放锁
void unlock();
// 创建条件变量
Condition newCondition();
}
ReentrantLock
ReentrantLock是可重入的互斥锁,可重入 意味着同一个线程可以多次获取同一把锁。
注意,与synchronized能够自动释放锁不同,使用Lock时必须在finally块中释放锁lock.unlock(),否则会导致死锁。
公平锁fair lock
按照线程请求锁的顺序来分配锁,先到先得。保证了公平性,但是性能较低。
非公平锁unfair lock
不保证线程获取锁的顺序,新来的线程可能比等待中的线程更早获得锁。性能更好,但容易导致线程饥饿。
可中断锁
等待获取锁的线程(尚未获得锁而被阻塞的线程)可以被中断,并抛出InterruptedException。
Lock lock = new ReentrantLock();
// 传统方式 - 不可中断
lock.lock(); // 如果锁被占用,线程会一直阻塞,无法被中断
// 可中断方式
lock.lockInterruptibly(); // 如果锁被占用,线程在等待时可以被其他线程中断
超时锁
可以指定线程对该锁的最长等待时间,成功时返回true,失败时返回false,避免死锁风险。
Lock lock = new ReentrantLock();
// 尝试获取锁,最多等待指定时间
if (lock.tryLock(3, TimeUnit.SECONDS)) {
try {
// 获取锁成功,执行临界区代码
} finally {
lock.unlock();
}
} else {
// 超时未获得锁,执行备用方案
}
关于读写锁
读写锁ReadWriteLock是一种特殊的锁机制,允许多个进程同时持有读锁,但只有一个进程可以获取写锁,读锁和写锁互斥,写锁与写锁互斥。这一设计是基于读操作数量更多且不会造成数据竞争。
ReentrantReadWriteLock
在Java中读写锁接口的标准实现是ReentrantReadWriteLock,该类提供了可重入的读写锁功能,类内提供了一个读锁和一个写锁,它们共享一个同步状态。同样支持公平锁和非公平锁两种模式,并支持tryLock()方法可以设置超时时间。
ReentrantReadWriteLock支持锁降级(持有写锁时可以获取读锁,然后将写锁释放,从写锁降级为读锁),为了避免死锁,不支持锁升级(即从持有读锁升级为写锁,防止多个持有读锁的进程尝试升级)。
这种方式也保证了数据的一致性,即数据更新后线程可以立刻读取最新的数据,避免释放写锁后其他线程修改数据。
ReadWriteLock rwLock = new ReentrantReadWriteLock();
// 写锁降级是安全的
rwLock.writeLock().lock();
try {
// 修改数据...
// 在释放写锁前获取读锁
rwLock.readLock().lock(); // 降级开始
} finally {
rwLock.writeLock().unlock(); // 降级完成
}
// 现在只有读锁,其他线程可以并发读取
StampedLock
在Java 8中引入的新特性,提供了写锁、悲观读锁和乐观读三种模式,相比ReentrantReadWriteLock,在读多写少的情况下性能更优。
- 写锁
独占锁,类似于
ReentrantReadWriteLock的写锁,但不允许重入 - 悲观读锁 共享锁,同样的,不允许重入
- 乐观读 不是采用锁,而是采用版本号来检查数据是否被修改,性能更好
关于Condition接口
Condition接口必须配合Lock使用,提供了比Object.wait/notify更强大和灵活的线程间通信制度。Condition允许线程在特定条件下等待,并在满足条件时被唤醒。
主要方法包括:
-
await() 使当前线程等待,直到被signal或者interrupt
-
signal() 唤醒一个等待的线程
-
signalAll()
唤醒所有等待的线程。
-
awaitUntil(Date ddl) 等待到指定的时间点
-
awaitUninterruptibly() 设置不可中断的等待
一个示例如下:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionBasic {
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private boolean ready = false;
public void waitForReady() throws InterruptedException {
lock.lock();
try {
while (!ready) {
System.out.println(Thread.currentThread().getName() + " waits for condition");
condition.await(); // 等待条件
}
System.out.println(Thread.currentThread().getName() + " Condition is ready, execute now");
} finally {
lock.unlock();
}
}
public void setReady() {
lock.lock();
try {
ready = true;
System.out.println("Condition is ready");
condition.signalAll(); // 唤醒所有等待的线程
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ConditionBasic example = new ConditionBasic();
// 启动等待线程
Thread waiter1 = new Thread(() -> {
try {
example.waitForReady();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Waiter-1");
Thread waiter2 = new Thread(() -> {
try {
example.waitForReady();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Waiter-2");
waiter1.start();
waiter2.start();
// 等待2秒后设置条件
Thread.sleep(2000);
example.setReady();
waiter1.join();
waiter2.join();
}
}
生产者消费者模式实现
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBuffer {
private final Object[] buffer;
private int putIndex = 0;
private int takeIndex = 0;
private int count = 0;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public BoundedBuffer(int capacity) {
buffer = new Object[capacity];
}
// 用于生产者
public void put(T item) throws InterruptedException {
lock.lock();
try {
// 等待缓冲区的空余空间
while (count == buffer.length) {
System.out.println(Thread.currentThread().getName() + " wait for buffer space");
notFull.await(); // await总是要放在while循环里
}
buffer[putIndex] = item;
putIndex = (putIndex + 1) % buffer.length;
count++;
// 打印生产信息
System.out.println(Thread.currentThread().getName() + " produced: " + item + ", now buffered: " + count);
// 通知消费者
notEmpty.signal();
} finally {
lock.unlock(); // 不要忘记释放锁
}
}
// 用于消费者
@SuppressWarnings("unchecked")
public T take() throws InterruptedException {
lock.lock();
try {
// 等待缓冲非空
while (count == 0) {
System.out.println(Thread.currentThread().getName() + " wait for non-empty buffer space");
notEmpty.await();
}
T item = (T) buffer[takeIndex];
buffer[takeIndex] = null;
takeIndex = (takeIndex + 1) % buffer.length;
count--;
// 打印消费信息
System.out.println(Thread.currentThread().getName() + " consumed: " + item + ", now buffered: " + count);
// 通知生产者
notFull.signal();
return item;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
BoundedBuffer buffer = new BoundedBuffer<>(3);
// 生产者线程
Thread producer1 = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
buffer.put("Item-P1-" + i);
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Producer-1");
Thread producer2 = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
buffer.put("Item-P2-" + i);
Thread.sleep(150);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Producer-2");
// 消费者线程
Thread consumer1 = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
buffer.take();
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Consumer-1");
Thread consumer2 = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
buffer.take();
Thread.sleep(300);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Consumer-2");
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
}
}
上面这个示例中,就是通过notFull和notEmpty进行控制,从而完成了一个完整的有界缓冲区的实现。其中要注意的是,await()要放在while循环里,避免虚假唤醒;finally块中要记得释放锁。
关于原子类Atomic
atomic包提供了一系列原子类,提供了无锁的线程安全操作。
AtomicInteger
提供了对于int类型变量的原子操作,例如set(int newVal),get(),.incrementAndGet()等常用的操作。
AtomicReference
提供了对数据结构对象引用的原子更新操作。
CAS算法
Compare-And-Swap是一种无锁算法,包括三个操作数
- 内存位置V
- 预期原值A
- 新值B
其操作流程为
- 比较内存位置V的值和预期值A
- 如果相等,将内存位置V的值更新为B
- 否则,不做任何操作
- 返回操作是否成功
一个经典问题是ABA问题,即一个值原来是A,后来变成了B然后又变成了A,这时候CAS检查就会认为它的值没有变化。为了解决这一问题,AtomicStampedReference引入了时间戳,更新时不仅比较值,也会比较版本号。
在高并发的场景下,原子类比synchronized具有更好的性能表现,能够有效避免线程阻塞和上下文切换的开销。