深入java同步
多线程并发是一个大的topic,java本身也提供了很多工具来帮助我们更好的开发多线程程序。
1. 显示锁
synchronized
jvm内置锁,可重入,不保证公平性。 因为synchronized内置jvm,jvm可以对其做更多的优化,比如:对同一个对象反复的synchromize可以合并一个,轻量锁和偏向锁,用cas的方式优化了在竞争情况很少的场景下的性能。
reentrantlock锁
也能重入,比synchronized更加灵活,满足更多的使用场景。
- 等待可中断,
lock.lockInterruptibly()
- 可以超时的等待锁,
lock.tryLock(1000)
- 可实现公平锁,
new ReentrantLock(true)
- 可绑定多个条件,
lock.newCondition()
CountDownLatch 和 CyclicBarrier
闭锁和栅栏。提供常见的线程间阻塞等待,方便使用。
闭锁是一次性对象,一旦value减少到0,所有线程就可以执行了。可以用于等待其它线程都执行完毕;所有线程一起启动等。
栅栏可以重复使用,等待所有线程都调用await,所有线程就可以开始执行。
条件队列
类似条件变量(condition variables)提供了原子释放锁并阻塞的机制。可以对对象调用wait(Object.wait)。必须持有对象锁才能wait,wait的时候会释放锁,从wait唤醒时会再次请求拥有锁。这是一个谨慎的设计。
出了Object.wait,通过锁也能显式创建Condition对象,LinkedBlockQueue就用到了condition:
class LinkedBlockingQueue<T>{
Lock lock = new ReentrantLock();
private final Condition notfull = lock.newCondition();
public void Put(T x){
lock.lock();
try{
while(count == items.length){
notFull.await();
}
}finally{
lock.unlock();
}
}
}
Object.wait的jvm实现:
\\调用objectMonitor的wait方法 ,objectMonitor也是实现javasynchronized的源码 synchronizer.cpp
monitor->wait(millis, true, THREAD);
\\最终调用park阻塞线程
Self->_ParkEvent->park () ;
2. 并发容器
java能取得如此好的性能,与jdk实现了众多优秀的数据结构分不开。
在开发并发程序的时候,可以使用支持并发的容器,而不是hashmap,arraylist等。 常见使用:
- CopyOnWriteArrayList 监听器可以使用
- LinedBlockingQueue和ArrayBlockingQueue:生产者,消费者的场景可以使用
- concurrentHashMap:支持并发的map,1.8用cas算法实现
3. AQS
java中大部分锁和同步机制都是继承AQS类实现的,包括最常用的ReentrantLock 。 AQS提供了3个基本的机制。包括一个int变量来代表状态和线程等待队列,子类来具体决定原子整数的作用;线程等待队列;和管理线程的阻塞和唤醒。
- 原子变量
/**
* The synchronization state.
*/
private volatile int state;
AQS中用CAS指令来保证state的原子操作。
CAS的ABA问题
LockSupport.park(this);
在CAS中,通过比较V的值时候仍然是A来进行操作,。但是可能存在。V的值变成B,然后又变成A的情况。大部分情况这个不会影响程序的正确性,如果需要解决aba问题,可以用AtomicStampedReference。
- 线程阻塞
通过LockSupport提供了基础的线程阻塞机制。最终调用unsafe.part()。
在windows系统中最终调用等待函数来等待事件对象。linux没有一个统一个wait函数,不同的内核对象有其对应的函数。linux用互斥量和条件变量来实现线程阻塞 。
相关代码片段:
//linux park.hpp
class PlatformParker : public CHeapObj {
protected:
pthread_mutex_t _mutex [1] ;
pthread_cond_t _cond [1] ;
}
//调用系统api阻塞线程
pthread_cond_wait (_cond, _mutex)
//windows
class PlatformParker : public CHeapObj {
protected:
//Param2:手动重置事件 Param3:初始状态为false
HANDLE _ParkEvent = CreateEvent (NULL, true, false, NULL);
}
//调用javaThread的park
thread->parker()->park(isAbsolute != 0, time);
//最终调用windows系统方法
WaitForSingleObject(_ParkEvent, time);
//唤醒线程
SetEvent (_ParkEvent)
注:不同的操作系统在提供同步机制时,会有相同的术语,比如信号量,互斥量等。但实际的效果和作用可能并不一致。
- 线程等待队列
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
4. ThreadLocal
线程本地变量,线程Thread的成员变量,实现了线程之间的数据隔离,为了避免加锁竞争。
这个成员变量是ThreadLocalMap。
public class Thread implements Runnable {
/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;
}
ThreadLocalMap中的细节
- map的entry为弱应用,threadlocal实例对象this去map过去值。
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
- ThreadLocalMap处理hash冲突是通过开放地址法。因为存储元素少,并且本身key的hash离散型比较好,所以冲突几率低。
框架中的使用
常用的有ThreadLocalRandom,在全局的池化对象,如内存池,线程池都可以用到这个技术,来避免线程竞争。
虽然平常编程很少会用,但是很多框架都用到了。netty框架甚至重新实现了FastThreadLocal,见netty的文章。
spring事务对数据库连接的管理
ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal("Transactional resources");