并发编程
- 并发:指两个或多个事件在同一个时间段内发生。
- 并行:指两个或多个事件在同一时刻发生(同时发生)。
超线程:一个ALU对应多个PC
并发程序的特点:
- 线程之间相互制约的关系
- 线程执行过程需要上下文切换 断断续续的
- 并发数设置合理时(以CPU) 才会提高并发程序的性能
线程间通信
- 等待-唤醒机制
sequenceDiagram 生产者 ->> 同步对象: wait 消费者 ->> 同步对象: notify 同步对象 -->> 生产者: 继续执行
要注意,wait() notify() notifyAll()都需要在synchronized中
wait() 会释放锁,sleep() 不会
Object object = new Object();new Thread(){ @Override public void run() { synchronized (object){ System.out.println("要5个包子"); // 进入等待,这时候锁会被释放 try { object.wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("得到了5个包子"); } }}.start();new Thread(){ @Override public void run() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (object){ System.out.println("包子生产完毕,告诉顾客"); // 通知等待线程中的任意一个 object.notify(); } }}.start();
- wait与notify一定要在线程同步中使用,并且是同一个锁的资源
- 在调用sleep()方法的过程中,线程不会释放对象锁
对象的共享
- 发布: 使对象能在当前作用域之外使用
- 逸出: 某个不该发布的对象被发布了
如果this在构造器完成构造之前逸出,还没被构造完成的对象被别人使用,会发生什么问题?
线程封闭
某个对象只能在某个线程之内使用
Ad-hoc线程封闭
- 完全由程序保证对象不被逸出,很脆弱
栈封闭
- 对象只能在局部(方法内)使用
public void process() { Object obj = new Object(); // 对对象做些计算 int result = obj.process(); return result;}
ThreadLocal
- 保证线程自己的数据封闭在本线程内
private static ThreadLocal<Connection> holder = ThreadLocal.withInitial(() -> getConnection());
不变性
不可变对象一定是线程安全的
- 对象创建后其状态就不能修改
- 对象的所有域都是final
- 在对象创建的过程中this引用没有逸出
安全发布
在多线程环境下使用可变的对象,需要通过安全发布的方式并且需要通过锁来保护
- 在静态初始化函数中初始化一个对象的引用(JVM同步机制保障)
- 将对象的引用保存到volatile类型的域或者 Reference对象
- 将对象的引用保存到正确初始化的对象的final域
- 将对象的引用保存到由锁保护的域
对象的组合
- 如何构建线程安全的类?
依赖状态的操作:某个操作包含有基于状态的先验操作
if (a== 1){ a++;}
在并发编程中,由于其他线程也会修改状态,所以需要一些JUC中的基础类库来帮助我们在并发环境下执行基于依赖的操作
在提供多线程API时,将是否线程安全文档化
实例封闭
- 监视器模式(客户端加锁),最简单的封闭机制
将线程不安全的对象封装在某个进行良好并发控制的对象内
private Object obj = new Object();...synchronized(obj){ obj.xxx();}
线程安全委托
线程不安全的对象将线程安全的职责委托给线程安全的对象
// 线程安全的类private AtomicInteger coutner = new AtomicInteger();...void increase() { counter.increase();}
这种方式要求委托方对被委托方的API调用不能出现复合操作,否则委托方仍需要采用一定的线程安全机制
private AtomicInteger coutner1 = new AtomicInteger();private AtomicInteger coutner2 = new AtomicInteger();...// × 不安全void increase() { coutner1.increase(); coutner2.increase();}
取消与关闭
一个可取消的任务必须拥有取消策略
while(runnable) { // do something}
使用中断来取消是最合理的方式,线程中断是线程之间协作的一种手段,中断是取消的一种语义实现,所以这要求你自己的线程必须决定如何响应中断
除非知道某个线程的中断策略,否则不要中断该线程
// thread1while(!isInterrupted()){ System.out.println("running"); try { Thread.sleep(1000); } catch (InterruptedException e) { break; }}System.out.println("my thread done");// main threadthread1.interrupt();
JVM 在线程阻塞状态时若发生中断,会抛出一个中断异常,在非阻塞情况下,就需要检查中断状态来判断是否发生中断
使用Future取消
Future<Double> future = service.submit(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return Math.random();});try { Double ret = future.get(3, TimeUnit.SECONDS); System.out.println("result"+ret);} catch (ExecutionException | TimeoutException e) { e.printStackTrace();}finally { future.cancel(true); System.out.println("task cancel");}
处理不可中断的阻塞
由于如IO等的资源一旦阻塞就无法进行中断,所以可对其做关闭处理来模拟中断
停止基于线程的服务
基于生产者消费者的队列模式,要求消费者等待生产者完全关闭后,才能安全结束
- 使用ExecutorService的生命周期管理方法
- 毒药对象
- 本质上就是一个flag,当队列读取到这个毒药时,就会停止相关操作 这要求生产与消费者数量都要是已知的 因为只有接收到确定数量的毒药对象 才能判断是否所有生产者都停止了
处理非正常的线程终止
thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { System.out.println(t + "something happen" + e); }});new Thread(){ @Override public void run() { throw new RuntimeException("aaaa"); }}.start();
JVM关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { System.out.println("jvm shutdown"); }});
性能与伸缩性
引入线程的开销
- 上下文切换:切换浪费的CPU周期、重新加载线程重新载入数据的开销
- 内存同步:使用如 synchronized 的机制,线程之间竞争带来的开销
- 阻塞:IO操作或者竞争失败的线程可能会陷入阻塞等待被唤醒
如何减少锁的竞争
锁的请求频率越快 持有锁时间越长 竞争越激烈
- 缩小锁的范围(快进快出)
- 缩小synchronized关键字包围的代码块
- 减小锁的粒度
- 不同的操作使用不同的锁
- 分段锁 对集合元素进行平均拆分 使用 N 个锁 使一个锁保护 1/N 个元素
- 避免热点
- 替代独占锁
- 采取读写锁
并发程序测试
正确性测试
- 验证在不变性条件下后验条件是不是正确的
传统的单元测试只能在线程串行的情况运行
阻塞行为的测试:一个阻塞方法调用后线程应该等待到直至该线程被中断,抛出InterruptExpcetion
安全性测试:检查在并发情况下,极易发生错误的一些属性
资源管理测试:如测试对资源的限制是否真正起作用了
使用回调帮助测试:对于一些并发类库,会在某些节点回调客户端代码,可以利用这些回调来验证后验条件
加大线程切换以暴露错误:通过Thread.yield() 让步,产生更多的上下文切换,可能会更早暴露出错误
性能测试
使用场景选择 -> 多次执行场景 -> 衡量执行效率
性能测试陷阱
JVM 的某些行为会导致性能测试测量不准
- 垃圾回收STW
- 动态编译(JIT)导致热点代码被本地编译
- 编译优化
- 竞争程度,竞争的激烈程度应贴合真实场景
锁优化
自旋锁与自适应自旋
是让一个线程在请求一个共享数据的锁时执行忙循环(自旋)一段时间,如果在这段时间内能获得锁,就可以避免进入阻塞状态
自旋等待本身虽然避免了线程切换的开销,但它是要占用处理器时间的,如果等待时间比较短,自旋还是很划算的
自旋超过一定的阈值就不会再继续重试,自适应自旋则代表这个阈值不是固定的,会根据性能监控情况动态调整
锁消除
对于被检测出不可能存在竞争的共享数据的锁进行消除
锁细化
尽力缩小锁的作用范围
锁粗化
如果一系列的连续操作都对同一个对象反复加锁和解锁,频繁的加锁操作就会导致性能损耗
如果虚拟机探测到由这样的一串零碎的操作都对同一个对象加锁,将会把加锁的范围扩展(粗化)到整个操作序列的外部
synchronized(obj){ //...}synchronized(obj){ //...}synchronized(obj){ //...}
synchronized(obj){ //... //.. //...}
轻量级锁
轻量级锁是相对于传统的重量级锁而言,它使用 CAS 操作来避免重量级锁使用互斥量的开销
偏向锁
偏向于让第一个获取锁对象的线程,这个线程在之后获取该锁就不再需要进行同步操作,甚至连 CAS 操作也不再需要
同步工具类设计
状态依赖性管理
// 可阻塞的状态依赖操作结构获取锁while(前置条件不满足) { 释放锁 等待至前置条件满足 如果超时或者被interrupt则失败}执行动作释放锁
调用者处理失败
简单地将失败传递给调用者:只是将处理失败的职责从服务代码转移到客户代码
synchroized void put() { if (full) throw Excetion putVal()}
自旋阻塞
这种方式的问题在于如果线程一进入休眠,条件马上变为真,此时会浪费大量的时间在休眠上
void put(){ while(true) { synchroized(this) { if (full) { Thread.sleep(1999); continue; } putVal() return; } }}
条件队列
synchroized void put() { // wait会释放锁 // 当从wait中恢复,也就是被唤醒了,此时又获得了这把锁 // 等待的这个条件必须在变真时,以某种形式发出通知 否则死锁 while(full) wait(); // 即使被唤醒了 也不代表前置条件为真了 所以wait必须在一个循环中 doPut(); notifyAll();}
显式Condtion
这种方式相较于条件队列拥有更多的功能:可中断不可中断等待、基于时限的等待、公平等待
notFull = lock.newCondtion();notEmpty = lock.newCondtion();...void put(){ lock.lock(); while(full) notFull.await(); putVal(); notEmpty.singnal(); lock.unlock(); // 应该使用finally释放}
并发编程良好实践
- 给线程起名字
- 缩小同步范围
- 多用同步工具少用原始的wait,notify
- 使用阻塞队列
- 多用 ConcurrentHashMap 而不是 Hashtable
- 使用栈封闭以及不变性保证线程安全
- 使用线程池