项目开发笔记-4

Java并发编程

Posted by Sirin on November 4, 2025

并发Concurrent与并行Parallel的区别?

并发是同一时间段内处理多个任务,但不是同时执行,可以通过时间片轮转在单核CPU上实现,也就是说并发是逻辑上的同时进行

并行则是真正同时执行多个任务,必须有多核处理器或者多个处理器来支持,并行是物理上的同时进行

Java内存模型 Java Memory Model

内存模型的结构包括:

  • 主内存main memory 所有线程共享的内存区域,存储所有变量的主副本。用于存储实例字段和静态字段,是变量的唯一主副本。
  • 工作内存working memory 每个线程私有的内存区域,存储主内存中变量的副本。线程之间互相隔离,负责存储变量的副本,线程直接操作工作内存。

内存的交互操作包括:

  • 锁定 lock: 作用于主内存变量,将其标记为线程私有
  • 解锁 unlock: 作用于主内存变量,将其释放锁定状态
  • 读取 read: 从主内存读取变量到工作内存缓冲区
  • 载入 load:将read的值放入工作内存的变量副本
  • 使用 use: 将工作内存变量值传递给执行引擎
  • 赋值 assign: 将执行引擎的值传递给工作内存变量
  • 存储 store: 将工作内存变量值传送到主内存(先存储在工作内存缓冲区)
  • 写入 write: 将store的值写入主内存变量

之所以采取这种 read+loadstore+write的原子操作组合,是为了避免频繁修改导致的频繁访存,同时也能够很好的提供精确控制,避免死锁,保证一致性。

常见的线程安全问题

竞态条件: 多个线程同时访问和修改共享数据,最终结果依赖于线程执行的时序

数据竞争: 多个线程同时访问同一内存位置,且至少有一个是写操作,没有适当的同步

死锁: 两个或者多个线程互相等待对方释放资源,导致程序永久阻塞

并发编程的三大特性

可见性: 当一个线程修改了共享变量的值,其他线程能够立即可见这一修改

原子性: 一个操作或者一组操作要么全部执行完成,要么全部不完成,不会被其他线程打断

有序性: 程序执行的顺序按照代码的先后顺序执行,但编译器和处理器可能会进行指令重排序优化。

关于线程的创建

特性 Thread 类 Runnable 接口 Callable 接口
java.lang java.lang java.util.concurrent
方法 run() run() call()
返回值 无 (void) 无 (void) 有 (泛型类型)
异常 不能抛出受检异常 不能抛出受检异常 可以抛出受检异常
使用方式 继承 Thread 类 实现 Runnable 接口 实现 Callable 接口
执行方式 thread.start() new Thread(runnable).start() executor.submit(callable)
结果获取 无直接方式 无直接方式 通过 Future 对象获取
适用场景 简单的线程任务 任务与执行分离,资源共享 需要返回结果、异常处理的复杂任务

通常来说,可以使用lambda表达式简化Runnable和Callable接口创建线程的语法。

public class LambdaThreadExample {
    public static void main(String[] args) {
        // 使用Lambda表达式创建线程
        Thread thread1 = new Thread( () -> {
            for (int i = 0; i < 5; i++) {
                System.out.println("Lambda线程: " + i);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } );
        
        // 使用方法引用
        Thread thread2 = new Thread(LambdaThreadExample::printNumbers);
        
        thread1.start();
        thread2.start();
    }
    
    public static void printNumbers() {
        for (int i = 0; i < 5; i++) {
            System.out.println("方法引用线程: " + i);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

关于线程的状态、优先级与守护线程Daemon

线程的状态包括:

  • NEW: 线程对象已经创建但尚未调用Start()方法
  • RUNNABLE: 线程正在JVM中执行
  • BLOCKED: 线程被阻塞以等待监视器锁
  • WAITING: 线程无限期等待另一个线程执行特定操作
  • TIMED_WAITING: 线程等待指定的时间后自动返回
  • TERMINATED:线程执行完毕或者异常退出

线程的优先级包括:

  • MIN_PRIORITY(1)
  • NORM_PRIORITY(5)
  • MAX_PRIORITY(10)

守护线程是为其他线程提供服务的,必须在.start()之前进行设置.setDaemon(true)。当所有非守护线程结束时,JVM会退出。比如垃圾回收器GC就是典型的守护线程。

关于Synchronized

synchronized是最基本的同步机制,基于监视器锁Monitor Lock实现。每个Java对象都有一个内置的监视器锁,当线程进入到synchronized代码块或者方法时,就会自动获取这个锁。

该关键字保证了同一时刻只有一个线程可以执行被同步的代码

其机制包括

  • 获取锁: 线程进入synchronized代码块时,会尝试获取监视器锁,如果锁可用,则获取锁并继续执行
  • 等待锁: 如果监视器锁被其他线程持有,当前线程会被阻塞,进入等待状态,直到锁被释放
  • 释放锁: 线程执行完synchronized代码块后,会自动释放锁,唤醒等待的线程

synchronized可以修饰实例方法、静态方法和代码块,修饰的对象不同,获取的锁也不同。

对象锁(实例锁)

每个对象实例都有一个锁,不同对象实例的锁是完全独立的。

// 实例方法同步
public synchronized void instanceMethod() {
  	// 获取当前对象的锁
}


// 代码块同步
public void method() {
  	sychronized(this) {
      	// 获取当前对象的锁
    }
}

类锁(静态锁)

每个类的Class对象都有一个锁,因为每个类只有一个Class对象,所以是全局唯一的。

// 静态方法同步
public static synchronized void staticMethod() {
  	// 获取类的Class对象的锁
}

// Class对象同步
public void method() {
  	synchronized(MyClass.class) {
      	// 获取类的Class对象的锁
    }
}

⚠️注意

对象锁和类锁是完全独立的,一个线程获取了对象锁,不会阻止其他线程获取类锁。

死锁的条件

  • 互斥条件:资源不能被多个线程使用
  • 持有和等待:线程持有资源的同时在等待其他资源
  • 不可剥夺:资源不能强制从线程手中剥夺
  • 循环等待:存在线程资源的循环等待链

死锁预防策略

  • 锁排序:所有线程按照相同顺序获得锁
  • 锁超时:使用tryLock()设置获取锁的超时时间
  • 死锁检测:定期检测死锁并采取恢复措施
  • 避免嵌套锁:避免在持有锁的情况下获取锁

synchronized优化策略

减小锁粒度

// 不推荐 粗粒度锁
synchronized(this) {
  	update1();
  	update2();
}

// 细粒度锁
synchronized(lock1) { update1(); }
synchronized(lock2) { update2(); }

读写锁分离

ReadWriteLock rwLock = new ReentrantReadWriteLock();
Lock readLock = rwLock.readLock();
Lock writeLock = rwLock.writeLock();

由于JIT编译器本身的优化,在对象不存在逃逸(不会被其他线程访问)的情况,JVM会消除锁操作。

关于volatile

volatile关键字提供了比synchronized更轻量级的同步机制,主要是解决变量在多线程环境下的可见性问题。

  • 可见性保证:当一个线程修改volatile变量时,其他线程都立即可见这个操作
  • 禁止重排序:编译器和处理器不会对volatile变量的读写进行重排序,保证程序执行的有序性
  • 非原子性:对volatile变量的复合操作不保证原子性,如i++这样的操作仍然需要额外的同步机制

volatile实现机制

当一个变量被声明为volatile时,JVM会在适当的地方插入内存屏障指令,保证:

  • 写操作立即刷新到主内存,并使得其他CPU缓存失效
  • 读操作直接从主内存中读取最新值,不适用缓存
  • 防止指令重排序

关于内存屏障

内存屏障是一种CPU指令,用于控制特定条件下的内存操作顺序和可见性,包括四种:LoadLoad屏障,LoadStore屏障,StoreLoad屏障(开销最大),StoreStore屏障。其作用可以简略表述为:保证只有屏障前的操作执行完后,才能执行屏障后的操作。

// 写操作的内存屏障
StoreStore
volatile Write
StoreLoad
  
// 读操作的内存屏障
volatile Read
LoadLoad
LoadStore

关于happens-before规则

我们说操作A happens-before 操作B,意思是操作A的结果对于操作B来说是可见的。这里happens-before原则描述的是内存可见性,而不是时间关系。

对比synchronized与volatile

volatile synchronized
轻量级同步机制 重量级同步机制
可见性,有序性 可见性,有序性,原子性
不保证原子性 互斥访问
无阻塞特性 可能会导致线程阻塞
性能开销小 性能开销大

关于Lock

Lock接口提供了比synchronized关键字更灵活、更强大的锁机制,为synchronized无法满足需求时,提供了更多的选择,其核心方法包括:

public interface Lock {
    // 获取锁,如果锁不可用则阻塞等待
    void lock();
    
    // 可中断锁获取
    void lockInterruptibly() throws InterruptedException;
    
    // 尝试获取锁,立即返回结果
    boolean tryLock();
    
    // 在指定时间内尝试获取锁
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    
    // 释放锁
    void unlock();
    
    // 创建条件变量
    Condition newCondition();
}

ReentrantLock

ReentrantLock是可重入的互斥锁,可重入 意味着同一个线程可以多次获取同一把锁。

注意,与synchronized能够自动释放锁不同,使用Lock时必须在finally块中释放锁lock.unlock(),否则会导致死锁。

公平锁fair lock

按照线程请求锁的顺序来分配锁,先到先得。保证了公平性,但是性能较低。

非公平锁unfair lock

不保证线程获取锁的顺序,新来的线程可能比等待中的线程更早获得锁。性能更好,但容易导致线程饥饿。

可中断锁

等待获取锁的线程(尚未获得锁而被阻塞的线程)可以被中断,并抛出InterruptedException

Lock lock = new ReentrantLock();

// 传统方式 - 不可中断
lock.lock(); // 如果锁被占用,线程会一直阻塞,无法被中断

// 可中断方式
lock.lockInterruptibly(); // 如果锁被占用,线程在等待时可以被其他线程中断

超时锁

可以指定线程对该锁的最长等待时间,成功时返回true,失败时返回false,避免死锁风险。

Lock lock = new ReentrantLock();

// 尝试获取锁,最多等待指定时间
if (lock.tryLock(3, TimeUnit.SECONDS)) {
    try {
        // 获取锁成功,执行临界区代码
    } finally {
        lock.unlock();
    }
} else {
    // 超时未获得锁,执行备用方案
}

关于读写锁

读写锁ReadWriteLock是一种特殊的锁机制,允许多个进程同时持有读锁,但只有一个进程可以获取写锁,读锁和写锁互斥,写锁与写锁互斥。这一设计是基于读操作数量更多且不会造成数据竞争。

ReentrantReadWriteLock

在Java中读写锁接口的标准实现是ReentrantReadWriteLock,该类提供了可重入的读写锁功能,类内提供了一个读锁和一个写锁,它们共享一个同步状态。同样支持公平锁和非公平锁两种模式,并支持tryLock()方法可以设置超时时间。

ReentrantReadWriteLock支持锁降级(持有写锁时可以获取读锁,然后将写锁释放,从写锁降级为读锁),为了避免死锁,不支持锁升级(即从持有读锁升级为写锁,防止多个持有读锁的进程尝试升级)。

这种方式也保证了数据的一致性,即数据更新后线程可以立刻读取最新的数据,避免释放写锁后其他线程修改数据。

ReadWriteLock rwLock = new ReentrantReadWriteLock();

// 写锁降级是安全的
rwLock.writeLock().lock();
try {
    // 修改数据...
    
    // 在释放写锁前获取读锁
    rwLock.readLock().lock(); // 降级开始
} finally {
    rwLock.writeLock().unlock(); // 降级完成
}

// 现在只有读锁,其他线程可以并发读取

StampedLock

在Java 8中引入的新特性,提供了写锁、悲观读锁和乐观读三种模式,相比ReentrantReadWriteLock,在读多写少的情况下性能更优。

  • 写锁 独占锁,类似于ReentrantReadWriteLock的写锁,但不允许重入
  • 悲观读锁 共享锁,同样的,不允许重入
  • 乐观读 不是采用锁,而是采用版本号来检查数据是否被修改,性能更好

关于Condition接口

Condition接口必须配合Lock使用,提供了比Object.wait/notify更强大和灵活的线程间通信制度。Condition允许线程在特定条件下等待,并在满足条件时被唤醒。

主要方法包括:

  • await() 使当前线程等待,直到被signal或者interrupt

  • signal() 唤醒一个等待的线程

  • signalAll()

    唤醒所有等待的线程。

  • awaitUntil(Date ddl) 等待到指定的时间点

  • awaitUninterruptibly() 设置不可中断的等待

一个示例如下:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionBasic {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private boolean ready = false;

    public void waitForReady() throws InterruptedException {
        lock.lock();
        try {
            while (!ready) {
                System.out.println(Thread.currentThread().getName() + " waits for condition");
                condition.await(); // 等待条件
            }
            System.out.println(Thread.currentThread().getName() + " Condition is ready, execute now");
        } finally {
            lock.unlock();
        }
    }

    public void setReady() {
        lock.lock();
        try {
            ready = true;
            System.out.println("Condition is ready");
            condition.signalAll(); // 唤醒所有等待的线程
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ConditionBasic example = new ConditionBasic();

        // 启动等待线程
        Thread waiter1 = new Thread(() -> {
            try {
                example.waitForReady();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Waiter-1");

        Thread waiter2 = new Thread(() -> {
            try {
                example.waitForReady();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Waiter-2");

        waiter1.start();
        waiter2.start();

        // 等待2秒后设置条件
        Thread.sleep(2000);
        example.setReady();

        waiter1.join();
        waiter2.join();
    }
}

生产者消费者模式实现

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedBuffer {
    private final Object[] buffer;
    private int putIndex = 0;
    private int takeIndex = 0;
    private int count = 0;
    
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
    
    public BoundedBuffer(int capacity) {
        buffer = new Object[capacity];
    }
    
  	// 用于生产者
    public void put(T item) throws InterruptedException {
        lock.lock();
        try {
            // 等待缓冲区的空余空间
            while (count == buffer.length) {
                System.out.println(Thread.currentThread().getName() + " wait for buffer space");
                notFull.await();	// await总是要放在while循环里
            }
            
            buffer[putIndex] = item;
            putIndex = (putIndex + 1) % buffer.length;
            count++;
            
          	// 打印生产信息
            System.out.println(Thread.currentThread().getName() + " produced: " + item + ", now buffered: " + count);
            
            // 通知消费者
            notEmpty.signal();
        } finally {
            lock.unlock();	// 不要忘记释放锁
        }
    }
    
  
  	// 用于消费者
    @SuppressWarnings("unchecked")
    public T take() throws InterruptedException {
        lock.lock();
        try {
            // 等待缓冲非空
            while (count == 0) {
                System.out.println(Thread.currentThread().getName() + " wait for non-empty buffer space");
                notEmpty.await();
            }
            
            T item = (T) buffer[takeIndex];
            buffer[takeIndex] = null;
            takeIndex = (takeIndex + 1) % buffer.length;
            count--;
            
          	// 打印消费信息
            System.out.println(Thread.currentThread().getName() + " consumed: " + item + ", now buffered: " + count);
            
            // 通知生产者
            notFull.signal();
            
            return item;
        } finally {
            lock.unlock();
        }
    }
    
    public static void main(String[] args) {
        BoundedBuffer buffer = new BoundedBuffer<>(3);
        
        // 生产者线程
        Thread producer1 = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    buffer.put("Item-P1-" + i);
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Producer-1");
        
        Thread producer2 = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    buffer.put("Item-P2-" + i);
                    Thread.sleep(150);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Producer-2");
        
        // 消费者线程
        Thread consumer1 = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    buffer.take();
                    Thread.sleep(200);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Consumer-1");
        
        Thread consumer2 = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    buffer.take();
                    Thread.sleep(300);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Consumer-2");
        
        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();
    }
}

上面这个示例中,就是通过notFullnotEmpty进行控制,从而完成了一个完整的有界缓冲区的实现。其中要注意的是,await()要放在while循环里,避免虚假唤醒;finally块中要记得释放锁。

关于原子类Atomic

atomic包提供了一系列原子类,提供了无锁的线程安全操作。

AtomicInteger

提供了对于int类型变量的原子操作,例如set(int newVal),get(),.incrementAndGet()等常用的操作。

AtomicReference

提供了对数据结构对象引用的原子更新操作。

CAS算法

Compare-And-Swap是一种无锁算法,包括三个操作数

  • 内存位置V
  • 预期原值A
  • 新值B

其操作流程为

  1. 比较内存位置V的值和预期值A
  2. 如果相等,将内存位置V的值更新为B
  3. 否则,不做任何操作
  4. 返回操作是否成功

一个经典问题是ABA问题,即一个值原来是A,后来变成了B然后又变成了A,这时候CAS检查就会认为它的值没有变化。为了解决这一问题,AtomicStampedReference引入了时间戳,更新时不仅比较值,也会比较版本号。

在高并发的场景下,原子类比synchronized具有更好的性能表现,能够有效避免线程阻塞和上下文切换的开销。