复习一下基础~

概念

悲观锁

悲观锁认为被它保护的数据是极其不安全的,每时每刻都有可能变动,一个线程拿到悲观锁后,不允许任何线程对其进行获取,需要阻塞等待解锁

乐观锁

乐观锁认为数据变动不会太频繁,读取时不会上锁,但在写入时会进行判断数据是否最新。例如通过版本号实现,读取时一并读取版本号,写入时对比版本号。不一致则拒绝写入

CAS 无锁算法

Compare and Swap(比较并交换),CAS是一种无锁算法(不用锁实现线程同步,也叫非阻塞同步)。JUC就是建立在CAS之上的。

假设内存原数据V,旧预期值A,需要修改的新值B,则CAS的语义是“我认为V的值应该为A,如果是,那么将V的值更新为B,否则不修改并告诉V的值实际为多少”。有以下操作:

  1. 比较A与V是否相等(比较)
  2. 相等则将B写入V(交换)
  3. 返回结果(是否成功写入)

多个线程操作只有一个能成功,可见CAS是一个乐观锁

JUC并发包

从JDK1.5开始,引入了一个高级的处理并发的java.util.concurrent包,简称JUC。它提供了大量更高级的并发功能,能大大简化多线程程序的编写。

ReentranLock 可重入锁

synchronized一样。ReentrantLock 是一个可重用的互斥锁,又称独占锁。性能与 synchronized 相近,但功能更加丰富

public class Counter {
    private int count;

    public synchronized void add(int n) {
        count += n;
    }
}
// 等价于
public class Counter {
    private final Lock lock = new ReentrantLock();
    private int count;

    public void add(int n) {
        lock.lock();
        try {
            count += n;
        } finally {
            lock.unlock();
        }
    }
}

因为synchronized是Java语言层面提供的语法,即使抛出异常也能释放锁,而ReentranLock是JUC中用Java实现的,所以要在finally中释放。

ReentranLock可以尝试获取锁:

// 1秒内没取到锁返回false
if (lock.tryLock(1, TimeUnit.SECONDS)) {    
    try {
        ...
    } finally {
        lock.unlock();
    }
}

Condition 线程协作

synchronized可以使用waitnotify进行线程间的协作,但ReentrantLock不能直接使用waitnotify

于是有了Condition,以之前的TaskQueue为例:

class TaskQueue {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private Queue<String> queue = new LinkedList<>();

    public void addTask(String s) {
        lock.lock();
        try {
            queue.add(s);
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public String getTask() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                condition.await();
            }
            return queue.remove();
        } finally {
            lock.unlock();
        }
    }
}

Condition对象必须从Lock实例的newCondition()返回,这样才能获得一个绑定了Lock实例的Condition实例

Condition提供的await()signal()signalAll()相当于synchronized锁对象的wait()notify()notifyAll()

tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()signalAll()唤醒,可以自己醒来:

if (condition.await(1, TimeUnit.SECOND)) {
    // 被其他线程唤醒
} else {
    // 指定时间内没有被其他线程唤醒
}

ReadWriteLock 悲观读写锁

无论是synchronized还是ReentranLock,都只允许一个线程在临界区内执行代码。但很多时候我们需要更灵活地控制,例如IO操作时:读取不会影响数据,我们希望能多个线程同时读,而写入时不允许其它任何线程读取和写入。

使用ReadWriteLock可以解决这个问题:

  • 只允许一个线程写入(其他线程既不能写入也不能读取);
  • 没有写入时,多个线程允许同时读(提高性能)。

例:

public class FileIO {
    private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
    private final Lock rlock = rwlock.readLock();
    private final Lock wlock = rwlock.writeLock();

    public void write(char[] data) {
        wlock.lock();       // 加写锁
        try {
            // 写入
        } finally {
            wlock.unlock(); // 释放写锁
        }
    }

    public char[] read() {
        rlock.lock();       // 加读锁
        try {
            // 读取
        } finally {
            rlock.unlock(); // 释放读锁
        }
    }
}

StampedLock 乐观读写锁

StampedLock是JDK8新引入的读写锁,和ReadWriteLock的区别在于。后者如果有线程在读,写入线程需要等读取线程释放后才能写入。读的过程不能写,这是一种悲观的读锁。

StampedLock支持在读取的过程中写入,但这样数据就可能不一致,需要一些额外的判断。这是一种乐观锁

我们将上面ReadWriteLock的例子修改为StampedLock的版本:

public class FileIO {
    private final StampedLock stampedLock = new StampedLock();

    public void write(char[] data) {
        long stamp = stampedLock.writeLock();   // 获取写锁
        try {
            // 写入
        } finally {
            stampedLock.unlockWrite(stamp);     // 释放写锁
        }
    }

    public char[] read() {
        long stamp = stampedLock.tryOptimisticRead();   // 获取乐观读锁
        
        // 读取...
        // 读取的中途可能有写入 导致数据不一致
        
        if(!stampedLock.validate(stamp)) {     // 检查是否有其他写锁发生
            stamp = stampedLock.readLock();     // 获取悲观读锁
            try {
                // 读取...
            } finally {
                stampedLock.unlockRead(stamp);  // 释放悲观读锁
            }
        }
    }
}

ReadWriteLock相比,写入是一样的,不同的是读取:

  1. 先获取一个乐观读锁,并返回版本号。
  2. 如果读取过程中没有写入,版本号不变,则顺利执行。
  3. 若版本号变化,说明发生写入,需要获取一个悲观读锁再次读取

需要注意的是StampedLock是不可重入锁,不能在一个线程中反复获取同一个锁。

StampedLock还提供了更复杂的将悲观读锁升级为写锁的功能,它主要使用在if-then-update的场景:即先读,如果读的数据满足条件,就返回,如果读的数据不满足条件,再尝试写。

Concurrent 线程安全集合

我们之前尝试设计了一个线程安全的阻塞任务队列TaskQueue,实际上JUC早已提供此类线程安全的集合类:

接口 非线程安全 线程安全
List ArrayList CopyOnWriteArrayList
Map HashMap ConcurrentHashMap
Set HashSet / TreeSet CopyOnWriteArraySet
Queue ArrayDeque / LinkedList ArrayBlockingQueue / LinkedBlockingQueue
Deque ArrayDeque / LinkedList LinkedBlockingDeque

因为实现的是一样的接口,所以用法和非线程安全的集合类完全相同。

java.util.Collections工具类还提供了一个旧的线程安全集合转换器,例如将HashMap转换为线程安全的:Collections.synchronizedMap(map);

但是它实际上是通过包装类包装后对所有读写方法都用synchronized加锁,这样获得的线程安全集合的性能比java.util.concurrent集合要低很多,所以不推荐使用。

Atomic 原子类

JUC在java.util.concurrent.atomic包中提供了一组原子操作的封装类,是通过CAS实现的

例如AtomicInteger,主要方法有:

  • 增加指定值并返回:int addAndGet(int delta)
  • 自增并返回:int incrementAndGet()
  • 获取当前值:int get()
  • 用CAS方式设置(如果当前值为expect,则更新为update):boolean compareAndSet(int expect, int update)

如果自己通过CAS实现incrementAndSet(),大概是这样:

public int incrementAndGet() {
    int prev, next;
    do {
        prev = this.get();
        next = prev + 1;
    } while (!this.compareAndSet(prev, next));
    return next;
}

我们利用AtomicLong可以编写一个多线程安全的全局唯一ID生成器:

class IdGenerator {
    AtomicLong id = new AtomicLong(0);

    public long getNextId() {
        return id.incrementAndGet();
    }
}

JDK8中还添加了LongAdderLongAccumulato等原子类

ExecutorService 线程池

线程的创建、销毁都是非常消耗时间的,而通过线程池,我们可以复用一组线程,

Exeutor是线程池里的顶级接口,ExecutorService是继承了前者提供服务的接口,典型用法如下:

// 创建固定大小的线程池:
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交任务:
executor.submit(runnable);
// 关闭线程池
executor.shutdown();

JUC提供了几个ExcutorService的常用实现,可以通过ExecutorsnewXXX()系列静态工厂方法来创建:

  • FixedThreadPool:固定线程数的线程池,每次提交任务都会新增线程直到数量限制。有线程异常会新建线程替换
  • CachedThreadPool:可缓存的线程池,会重复利用线程,无线程可用则新建,会回收60秒未使用的线程
  • SingleThreadExecuor:单线程的线程池,有线程异常会新建线程替换
  • ScheduledThreadPool:支持定时任务和周期运行的线程池

ScheduledThreadPool 周期线程池

ScheduledThreadPool支持线程的周期运行,一个定时任务代表线程池的一个线程:

ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);
// 1秒后执行一次性任务
ses.schedule(runnable, 1, TimeUnit.SECONDS);
// 2秒后开始执行定时任务,每3秒执行:
ses.scheduleAtFixedRate(runnable, 2, 3, TimeUnit.SECONDS);
// 2秒后开始执行定时任务,以3秒为间隔执行(上一个任务执行结束后开始算)
ses.scheduleWithFixedDelay(runnable, 2, 3, TimeUnit.SECONDS);

推荐使用ScheduledExecutorService替代TimerScheduledExecutorService功能更强大也更安全。

  • 需要注意的是,在FixedRate模型下,当运行时间大于period间隔时间时,例如每三秒执行一次,但每个任务执行五秒,这时候后面的任务会等前面的执行完再运行
  • 抛出异常后,任务后续不会继续执行,并且没有报错,要注意异常捕获

ThreadPoolExecutor 自定义线程池

ThreadPoolExecutor是线程池的真正实现,上面的工厂方法也是对其设置参数的封装。

在《阿里巴巴Java开发手册》中也规定要使用其来创建线程池:

【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样 的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors 返回的线程池对象的弊端如下:

  • 1)FixedThreadPoolSingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

  • 2)CachedThreadPoolScheduledThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM

看来下它的完整参数构造方法:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • int corePoolSize:核心线程数量。即使在空闲状态也会被保留,除非设置了allowCoreThreadTimeOuttrue
  • int maximumPoolSize:允许的最大线程数量,不允许小于corePoolSize
  • long keepAliveTime: 当线程数大于核心线程数时,非核心线程的闲置超时时间。
  • TimeUnit unitkeepAliveTime参数的时间单位,例TimeUnit.SECONDS为秒。
  • BlockingQueue<Runnable> workQueue:任务队列,常见有三种队列:SynchronousQueueLinkedBlockingDequeArrayBlockingQueue
  • ThreadFactory threadFactory:线程工厂,提供创建新线程的功能。《阿里巴巴Java开发手册》规定要使用带有这个参数的构造方法,来方便地管理线程。
  • RejectedExecutionHandler handler:当到达线程池资源上限时,添加新线程会调用RejectedExecutionHandler接口的rejectedExecution方法

任务队列规则

线程添加的规则和workQueue的类型有很大关系,当一个线程任务添加到线程池时:

  • 如果线程池中的线程数量 <= corePoolSize,则新建线程,不放入队列

  • 如果线程池中的线程数量 > corePoolSize,<= maximumPoolSize时。

    • 队列为LinkedBlockingDeque,则放入队列排队。如果队列已满,则直接新建线程执行任务
    • 队列为SynchronousQueue,则直接新建线程处理该任务
    • 非核心线程闲置时间超过keepAliveTime后就会被销毁。
  • 如果线程池中的线程数量 > corePoolSizemaximumPoolSize

    • 队列为LinkedBlockingDeque,如果队列没有大小限制,会将任务放进队列。如果队列有大小限制,则交给handler拒绝任务。
    • 队列为SynchronousQueue,会使用handler拒绝任务。

Future 异步返回机制

Runnable没有返回值比较不方便,JUC提供了个Callable泛型接口,和前者相比,多了一个返回值,并且可以通过泛型指定返回结果。

前面提到的线程ExecutorServicesubmit()返回了一个Future,一个Future类型的实例代表一个未来能获取结果的对象:

ExecutorService e = Executors.newSingleThreadExecutor();
Future<String> future = e.submit(new Callable<String>() {
    @Override
    public String call() throws Exception {
        return "task result";
    }
});
// 返回结果 任务未完成则阻塞等待
try {
    future.get();
} catch (ExecutionException ex) {
    ex.printStackTrace();
}

Future还有以下方法:

  • get():获取结果(可能会等待)
  • get(long timeout, TimeUnit unit):获取结果,但只等待指定的时间;
  • cancel(boolean mayInterruptIfRunning):取消当前任务;
  • isDone():判断任务是否已完成。

Semaphore 信号量

Semaphore 本质上是一个共享锁,线程获取到许可时才能执行:

ExecutorService executor = Executors.newCachedThreadPool();
// 最多同时发放3个许可
Semaphore s = new Semaphore(3);
for(int i = 0; i < 10; i++) {
    executor.execute(() -> {
        try {
            // 获取许可
            s.acquire();
            // 执行操作
            Thread.sleep(3000);
            // 释放
            s.release();
        } catch(Exception e) {
            
        }
    }
}