深入java同步

多线程并发是一个大的topic,java本身也提供了很多工具来帮助我们更好的开发多线程程序。

1. 显示锁

synchronized

jvm内置锁,可重入,不保证公平性。 因为synchronized内置jvm,jvm可以对其做更多的优化,比如:对同一个对象反复的synchromize可以合并一个,轻量锁和偏向锁,用cas的方式优化了在竞争情况很少的场景下的性能。

reentrantlock锁

也能重入,比synchronized更加灵活,满足更多的使用场景。

  • 等待可中断,lock.lockInterruptibly()
  • 可以超时的等待锁,lock.tryLock(1000)
  • 可实现公平锁,new ReentrantLock(true)
  • 可绑定多个条件,lock.newCondition()

CountDownLatch 和 CyclicBarrier

闭锁和栅栏。提供常见的线程间阻塞等待,方便使用。

闭锁是一次性对象,一旦value减少到0,所有线程就可以执行了。可以用于等待其它线程都执行完毕;所有线程一起启动等。

栅栏可以重复使用,等待所有线程都调用await,所有线程就可以开始执行。

条件队列

类似条件变量(condition variables)提供了原子释放锁并阻塞的机制。可以对对象调用wait(Object.wait)。必须持有对象锁才能wait,wait的时候会释放锁,从wait唤醒时会再次请求拥有锁。这是一个谨慎的设计。

出了Object.wait,通过锁也能显式创建Condition对象,LinkedBlockQueue就用到了condition:

class LinkedBlockingQueue<T>{
    Lock lock = new ReentrantLock();
    private final Condition notfull = lock.newCondition();

    public void Put(T x){
        lock.lock();
        try{
            while(count == items.length){
                notFull.await();
            }
        }finally{
            lock.unlock();
        }
    }
}

Object.wait的jvm实现:

\\调用objectMonitorwait方法 ,objectMonitor也是实现javasynchronized的源码 synchronizer.cpp
monitor->wait(millis, true, THREAD);
\\最终调用park阻塞线程
Self->_ParkEvent->park () ;

2. 并发容器

java能取得如此好的性能,与jdk实现了众多优秀的数据结构分不开。

在开发并发程序的时候,可以使用支持并发的容器,而不是hashmap,arraylist等。 常见使用:

  • CopyOnWriteArrayList 监听器可以使用
  • LinedBlockingQueue和ArrayBlockingQueue:生产者,消费者的场景可以使用
  • concurrentHashMap:支持并发的map,1.8用cas算法实现

3. AQS

java中大部分锁和同步机制都是继承AQS类实现的,包括最常用的ReentrantLock 。 AQS提供了3个基本的机制。包括一个int变量来代表状态和线程等待队列,子类来具体决定原子整数的作用;线程等待队列;和管理线程的阻塞和唤醒。

  1. 原子变量
/**
* The synchronization state.
*/
private volatile int state;

AQS中用CAS指令来保证state的原子操作。

CAS的ABA问题

LockSupport.park(this);

在CAS中,通过比较V的值时候仍然是A来进行操作,。但是可能存在。V的值变成B,然后又变成A的情况。大部分情况这个不会影响程序的正确性,如果需要解决aba问题,可以用AtomicStampedReference。

  1. 线程阻塞

通过LockSupport提供了基础的线程阻塞机制。最终调用unsafe.part()。

在windows系统中最终调用等待函数来等待事件对象。linux没有一个统一个wait函数,不同的内核对象有其对应的函数。linux用互斥量和条件变量来实现线程阻塞 。

相关代码片段:

//linux  park.hpp
class PlatformParker : public CHeapObj {
  protected:
    pthread_mutex_t _mutex [1] ;
    pthread_cond_t  _cond  [1] ;
}
//调用系统api阻塞线程
pthread_cond_wait (_cond, _mutex)
//windows
class PlatformParker : public CHeapObj {
  protected:
    //Param2:手动重置事件 Param3:初始状态为false
    HANDLE _ParkEvent = CreateEvent (NULL, true, false, NULL);
}
//调用javaThread的park
thread->parker()->park(isAbsolute != 0, time);
//最终调用windows系统方法
WaitForSingleObject(_ParkEvent, time);
//唤醒线程
SetEvent (_ParkEvent)

注:不同的操作系统在提供同步机制时,会有相同的术语,比如信号量,互斥量等。但实际的效果和作用可能并不一致。

  1. 线程等待队列
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;

4. ThreadLocal

线程本地变量,线程Thread的成员变量,实现了线程之间的数据隔离,为了避免加锁竞争。

这个成员变量是ThreadLocalMap。

public class Thread implements Runnable {
    /* ThreadLocal values pertaining to this thread. This map is maintained
     * by the ThreadLocal class. */
    ThreadLocal.ThreadLocalMap threadLocals = null;
}

ThreadLocalMap中的细节

  1. map的entry为弱应用,threadlocal实例对象this去map过去值。
static class Entry extends WeakReference<ThreadLocal<?>> {
        /** The value associated with this ThreadLocal. */
        Object value;
        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
}
  1. ThreadLocalMap处理hash冲突是通过开放地址法。因为存储元素少,并且本身key的hash离散型比较好,所以冲突几率低。

框架中的使用

常用的有ThreadLocalRandom,在全局的池化对象,如内存池,线程池都可以用到这个技术,来避免线程竞争。

虽然平常编程很少会用,但是很多框架都用到了。netty框架甚至重新实现了FastThreadLocal,见netty的文章。

spring事务对数据库连接的管理

ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal("Transactional resources");

Updated: