J.U.C之并发容器(CopyOnWriteArrayList, ConcurrentHashMap)

并发容器和同步容器

同步容器

同步容器常见的有Vector,HashTable等,这些类实现线程安全的方式是:将它们的状态封装起来,并对每个公有方法都进行同步(使用synchronized关键字来修饰)使得每次只有一个线程能访问容器的状态。
在多线程场景下,能安全的使用同步容器所附带的方法,但是在某些情况下可能需要额外的客户端加锁来保护复合操作。常见的复合操作:迭代,跳转(根据指定顺序找到当前元素的下一个元素)以及条件运算。
所以同步容器不光削弱了并发性,降低了吞吐量,在某些符合复合操作的场景下任然会有着线程安全问题。

public static Object getLast (Vector list) {
int lastIndex = list.size - 1;
return list.get(lastIndex);
}

public static void deleteLast (Vector list) {
int lastIndex = list.size - 1;
list.remove(lastIndex);
}

若有多个线程交替执行上述两个方法,则可能会面临着线程安全问题,如下图所示:

要解决上述问题就需要客户端加锁。

public static Object getLast (Vector list) {
synchronized(list){
int lastIndex = list.size - 1;
return list.get(lastIndex);
}
}

public static void deleteLast (Vector list) {
synchronized(list){
int lastIndex = list.size - 1;
list.remove(lastIndex);
}
}

对于同步容器的迭代操作也会面临线程不安全情况,如果在迭代过程中,有其他的线程删除了一个元素,并且迭代过程和删除过程交替执行,那么将会抛出ArrayIndexOutOfBoundsException异常。

for(int i = 0; i < vector.size(); i++) {
doSomething(vector.get(i));
}

也是可以通过客户端加锁来解决。

synchronized(vector){
for(int i = 0; i < vector.size(); i++) {
doSomething(vector.get(i));
}
}

并发容器

JAVA 5.0 提供了多种并发容器类来改进同步容器的性能,通过并发容器来代替同步容器,可以极大地提高伸缩性并降低风险。主要是因为同步容器将所有对容器状态的访问都串行化,实现它们的线程安全性,但是这种的代价严重降低了并发性,当多个线程竞争容器的锁时,吞吐量严重降低。
并发容器是针对多个线程并发访问设计的,也提供了一些在使用同步容器时需要客户端加锁的复合操作,在Java 5.0 增加了ConcurrentHashMap用来替代同步且基于散列的Map,以及CopyOnWriteArrayList用于在遍历操作为主要操作的情况下替代同步的List,Java 6 也引入了ConcurrentSkipListMap 和 ConcurrentSkipListSet,分别作为同步的SortedMap(TreeMap) 和 SortedSet(TreeSet)的并发替代品。

常见并发容器

CopyOnWriteArrayList

概述

Copy-On-Write简称COW,是一种用于程序设计中的优化策略。其基本思路是,从一开始大家都在共享同一个内容,当某个人想要修改这个内容的时候,才会真正把内容Copy出去形成一个新的内容然后再改,这是一种延时懒惰策略。从JDK1.5开始Java并发包里提供了两个使用CopyOnWrite机制实现的并发容器,它们是CopyOnWriteArrayList和CopyOnWriteArraySet。
copyOnWriteArrayList 用于替代同步List(除此之外,还有copyOnWriteArraySet来替代同步的Set),在某些情况下提供了更好的并发性能,在迭代期间(读取)不需要对容器进行加锁或复制。
“写入时复制(Copy-On-Write)” 容器的线程安全性在于,只要正确发布一个事实不可变对象,那么在访问该对象时就不需要进一步的同步。
在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性,修改操作都在新的容器副本上进行,那么旧的容器可以看作为一个事实不可变对象,并且在修改时使用ReentrantLock锁来保证同步。
在对容器进行迭代的时候,"写入时复制"容器的迭代器保留一个指向底层基础数组的引用,这个数组当前位于迭代器的初始位置,由于它不会被修改,因此在对其进行同步时只需确保数组内容的可见性。因此,多个线程可以同时对这个容器进行迭代,而不会彼此干扰或与修改容器的线程相互干扰。
"写入时复制"容器返回的迭代器不会抛出ConcurrentModificationException,并且返回的元素和迭代器创建时的元素完全一致,而不必考虑修改所带来的影响。、
CopyOnWriteAarryList并发容器用于读多写少的并发场景。比如白名单,黑名单,商品类目的访问和更新场景。

源码分析

  1. add()方法
    add方法向CopyOnWirteArrayList中添加元素,在添加过程中会使用一个ReentrantLock来保证添加操作的线程安全性。
    由于每当修改容器时都会复制底层数组,需要花费一定的开销,尤其是当容器规模较大时,只有迭代操作远远大于修改操作时才会使用"写入时复制"容器。

    public boolean add(E e) {
    // 使用一个可重入锁来保证同步
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    // 得到旧数组
    Object[] elements = getArray();
    // 记录旧数组长度
    int len = elements.length;
    // 创建一个新数组, 并将旧数组上的元素复制到新数组
    Object[] newElements = Arrays.copyOf(elements, len + 1);
    // 添加元素到新数组中
    newElements[len] = e;
    // 新数组的引用重新只需旧数组
    setArray(newElements);
    return true;
    } finally {
    lock.unlock();
    }
    }
  2. get()方法
    get方法通过getArray方法获得旧数组,然后根据index索引来得到元素。因为在读取期间,可能有线程根据旧的数组copy了一个新的数组,在新的数组上进行添加操作,但是get方法操作的还是旧数组,那么get方法可能得到是"脏"数组。

    public E get(int index) {
    // 通过getArray方法得到旧数组
    // 通过index索引来得到元素
    return get(getArray(), index);
    }

    private E get(Object[] a, int index) {
    return (E) a[index];
    }
  3. iterator()方法
    在对容器进行迭代的时候,"写入时复制"容器的迭代器保留一个指向底层基础数组(旧数组)的引用,由于它不会被修改,因此在对其进行同步时只需确保数组内容的可见性。因此,多个线程可以同时对这个容器进行迭代,而不会彼此干扰或与修改容器的线程相互干扰。
    "写入时复制"容器返回的迭代器不会抛出ConcurrentModificationException,并且返回的元素和迭代器创建时的元素完全一致,而不必考虑修改所带来的影响。

    // 然后迭代器对象的时候, 会把当前数组传递给迭代器进行备份
    public Iterator<E> iterator() {
    return new COWIterator<E>(getArray(), 0);
    }
    // CopyOnWriteArrayList 迭代器
    private COWIterator(Object[] elements, int initialCursor) {
    cursor = initialCursor;
    snapshot = elements;
    }

应用缺点

  1. 内存占用问题。因为CopyOnWrite的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象(写的时候会创建新容器来保持数据,而旧容器的对象还在使用,所以有两份对象内存)。如果这些对象占用的内存比较大,,那么这个时候很有可能造成频繁的Yong GC和Full GC。
  2. 数据一致性问题。CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器。

参考资料

[1] https://www.cnblogs.com/dolphin0520/p/3938914.html

ConcurrentHashMap

概述

在线程不安全的HashMap以及(多线程操作下可能会形成环形链表)效率低下的HashTable(HashTable虽说是线程安全的,但是它是通过全表锁来实现的,即简单粗暴的使用synchronized关键字对put方法加锁,导致了在多线程访问下所有操作的串行话)的背景下,急需一个满足多线程安全并且高效率的Hash表并发容器。
ConcurrentHashMap 在HashTable的基础上进行了优化,他使用了分段锁技术来达到线程安全要求,其将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。
并且ConcurrentHashMap可以做到读取数据不加锁,并且其内部的结构可以让其在进行写操作的时候能够将锁的粒度保持地尽量地小,不用对整个ConcurrentHashMap加锁。

源码分析

  1. JDK1.7 源码
    ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment继承了可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。
    一个ConcurrentHashMap里包含一个Segment数组,Segment的结构是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,可以将一个Segment可以看作是一个类似于HashMap结构。当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。

    ConcurrentHashMap定位一个元素的过程需要进行两次Hash操作,第一次Hash定位到Segment,第二次Hash定位到元素所在的链表的头部,因此,这一种结构的带来的副作用是Hash的过程要比普通的HashMap要长,但是带来的好处是写操作的时候可以只对元素所在的Segment进行加锁即可,不会影响到其他的Segment。
    扩容操作只使用了HashEntry数组,Segment数组初始化之hou就不可变。

    1. Segment/HashEntry
      Segment定义如下:

      static final class Segment<K,V> extends ReentrantLock implements Serializable { 
      // Segment中元素的数量
      transient volatile int count;
      // 对HashEntry数组的操作次数(put/remove, 修改不算)
      transient int modCount;
      // 阈值, Segment里面元素的数量超过这个值依旧就会对Segment进行扩容
      transient int threshold;
      // HashEntry数组, 数组中的每一个元素代表了一个链表的头部
      transient volatile HashEntry<K,V>[] table;
      // 负载因子, 用于确定threshold
      final float loadFactor;
      …….
      }

      HashEntry定义如下:

      static final class HashEntry<K,V> { 
      final K key;
      // 根据key的hashCode计算得到的hash值
      // 用来定位HashEntry在HashEntry数组中的位置
      final int hash;
      volatile V value;
      // next为final类型, 这代表着next指针不可修改
      // 所以put操作都将新的HashEntry节点放置在链表头部
       final HashEntry<K,V> next;
      ……
      }
    2. 构造函数

      // 不指定则为默认参数:
      //    initialCapacity:16
      //    loadFactor:0.75
      //    concurrencyLevel:16    
      public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
      // 参数检验
      if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
      throw new IllegalArgumentException();
      // 当concurrencyLevel大于Segment最大数量, 进行重置
      // MAX_SEGMENTS 为1 << 16 = 65536, 也就是最大并发数为65536
      if (concurrencyLevel > MAX_SEGMENTS)
      concurrencyLevel = MAX_SEGMENTS;
      int sshift = 0;
      int ssize = 1;
      // 根据concurrencyLevel得到合适的segment数组长度ssize
      // 为了更好根据hash值计算索引, segment长度需要为2的n次方
      while (ssize < concurrencyLevel) {
      ++sshift;
      ssize <<= 1;
      }
      // segmentShift:段偏移量 segmentMask:段掩码, 这两个变量在定位segment时用来计算segment索引
      this.segmentShift = 32 - sshift;
      this.segmentMask = ssize - 1;
      if (initialCapacity > MAXIMUM_CAPACITY)
      initialCapacity = MAXIMUM_CAPACITY;
      // 计算cap的大小, 即Segment中HashEntry的数组长度, cap也一定为2的n次方
      int c = initialCapacity / ssize;
      if (c * ssize < initialCapacity)
      ++c;
      int cap = MIN_SEGMENT_TABLE_CAPACITY;
      while (cap < c)
      cap <<= 1;
      //创建segments数组并初始化第一个Segment,其余的Segment延迟初始化
      Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
      (HashEntry<K,V>[])new HashEntry[cap]);
      // 创建Segment数组
      Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
      UNSAFE.putOrderedObject(ss, SBASE, s0);
      this.segments = ss;
      }

      从源码可以知道,Segment数组的大小ssize是由concurrentLevel来决定的,但是却不一定等于concurrentLevel,ssize一定是大于或等于concurrentLevel的最小的2的次幂。这种设计其实主要是便于通过按位与的散列算法来定位Segment的index,即用hash & (n - 1) 代替 hash % n。
      构造函数执行流程如下:

      1. 必要参数校验;
      2. 校验并发级别 concurrencyLevel 大小,如果大于最大值,重置为最大值;
      3. 寻找并发级别 concurrencyLevel 之上最近的 2 的幂次方值,作为初始化容量大小;
      4. 记录 segmentShift 偏移量,这个值为 容量 = 2 的N次方 中的 N,在后面 Put 时计算位置时会用到,默认是 32 - sshift = 28;
      5. 记录 segmentMask,默认是 ssize - 1 = 16 -1 = 15;
      6. 计算HashEntry数组的长度;
      7. 初始化 segments[0],默认大小为 2,负载因子 0.75,扩容阀值是 2 * 0.75=1.5,插入第二个值时才会进行扩容。
    3. put方法

      public V put(K key, V value) {
      //concurrentHashMap不允许key/value为空
      Segment<K,V> s;
      if (value == null)
      throw new NullPointerException();
      // 根据key的hashCode得到hash值
      //hash函数对key的hashCode重新散列,避免差劲的不合理的hashcode,保证散列均匀
      int hash = hash(key.hashCode());
      / 返回的hash值无符号右移segmentShift位与段掩码进行位运算,定位segment
      // 默认segmentShift为28, segmentMask为15
      // 即 hash >>> 28 & 15, 即高四位相与于1111
      int j = (hash >>> segmentShift) & segmentMask;
      if ((s = (Segment<K,V>)UNSAFE.getObject
      (segments, (j << SSHIFT) + SBASE)) == null)
      // 如果查找到的 Segment 为空, 初始化
      s = ensureSegment(j);
      return s.put(key, hash, value, false);
      }

      上述put方法主要是根据key值计算segment的位置,并且如果segment为空,调用ensureSegment方法进行初始化。

      private Segment<K,V> ensureSegment(int k) {
      final Segment<K,V>[] ss = this.segments;
      long u = (k << SSHIFT) + SBASE; // raw offset
      Segment<K,V> seg;
      // 判断 u 位置的 Segment 是否为null
      if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
      Segment<K,V> proto = ss[0]; // use segment 0 as prototype
      // 获取0号 segment 里的 HashEntry<K,V> 初始化长度
      int cap = proto.table.length;
      // 获取0号 segment 里的 hash 表里的扩容负载因子,所有的 segment 的 loadFactor 是相同的
      float lf = proto.loadFactor;
      // 计算扩容阀值
      int threshold = (int)(cap * lf);
      // 创建一个 cap 容量的 HashEntry 数组
      HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
      if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck
      // 再次检查 u 位置的 Segment 是否为null,因为这时可能有其他线程进行了操作
      Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
      // 自旋检查 u 位置的 Segment 是否为null
       while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
      // 使用CAS 赋值,只会成功一次
      if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
      break;
      }
      }
      }
      return seg;
      }

      当获取的segment为空,调用ensureSegment方法进行初始化。初始化流程如下:

      1. 检查计算得到的位置的 Segment 是否为null;
      2. 为 null 继续初始化,使用 Segment[0] 的容量和负载因子创建一个 HashEntry 数组;
      3. 再次检查计算得到的指定位置的 Segment 是否为null;
      4. 使用创建的 HashEntry 数组初始化这个 Segment;
      5. 自旋判断计算得到的指定位置的 Segment 是否为null,使用 CAS 在这个位置赋值为 Segment;
      // 找到了指定的segment之后, 将数据存放到其HashEntry数组对应位置
      final V put(K key, int hash, V value, boolean onlyIfAbsent) {
      // 获取 ReentrantLock 独占锁,获取不到, scanAndLockForPut 获取, 该方法会不断自旋tryLock获取锁, 当自旋次数大于指定次数时lock()阻塞获取锁, 并且返回表头node节点
      HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
      V oldValue;
      try {
      HashEntry<K,V>[] tab = table;
      // 计算要put的数据在HashEntry的哪个位置
      int index = (tab.length - 1) & hash;
      // CAS 获取 index 坐标的值
      HashEntry<K,V> first = entryAt(tab, index);
      // 遍历当前位置HashEntry链表, 找到合适的插入位置
      for (HashEntry<K,V> e = first;;) {
      if (e != null) {
      // 检查是否 key 已经存在,如果存在,则遍历链表寻找位置,找到后替换 value
      K k;
      // 已经存在了相同的key, 直接用新值覆盖
      if ((k = e.key) == key ||
      (e.hash == hash && key.equals(k))) {
      oldValue = e.value;
      if (!onlyIfAbsent) {
      e.value = value;
      ++modCount;
      }
      break;
      }
      e = e.next;
      }
      else {
      // 发生了地址冲突, 采用链表头插法进行插入
      if (node != null)
      node.setNext(first);
      else
      // 当前HashEntry链表还是为空
      node = new HashEntry<K,V>(hash, key, value, first);
      int c = count + 1;
      // 容量大于扩容阀值,小于最大容量,进行扩容
      if (c > threshold && tab.length < MAXIMUM_CAPACITY)
      rehash(node);
      else
      // index 位置赋值 node, node 可能是一个元素, 也可能是一个链表的表头
      setEntryAt(tab, index, node);
      ++modCount;
      count = c;
      oldValue = null;
      break;
      }
      }
      } finally {
      unlock();
      }
      return oldValue;
      }

      上述put方法完成真正的数据存放操作,即将数组存入到HashEntry中。具体执行流程如下:

      1. tryLock() 获取锁,获取不到使用 scanAndLockForPut 方法继续获取。scanAndLockForPut 该方法会不断自旋tryLock()获取锁,当自旋次数大于指定次数时lock()阻塞获取锁, 获取锁成功后返回返回要存放的node节点(根据传入的Key,Value构造);
      2. 计算 put 的数据要放入的 index 位置,然后获取这个位置上的 HashEntry ;
      3. 遍历HashEntry链表以便得到合适位置来存放新元素,分为以下两种情况:
        1. 如果这个位置上的 HashEntry 存在:
          • 判断链表当前元素 Key 和 hash 值是否和要 put 的 key 和 hash 值一致,一致则替换值;
          • 不一致,获取链表下一个节点,直到发现相同进行值替换,或者链表遍历完毕没有相同的。
        2. 如果这个位置上的 HashEntry 不存在:
          • 如果当前容量大于扩容阀值,小于最大容量,进行扩容;
          • 直接头插法插入。
    4. rehash方法

      // rehash扩容只针对HashEntry的扩容, 不会对Segment机械能扩容
      private void rehash(HashEntry<K,V> node) {
      HashEntry<K,V>[] oldTable = table;
       // 老容量
      int oldCapacity = oldTable.length;
      // 新容量, 扩大两倍
      int newCapacity = oldCapacity << 1;
      // 新的扩容阀值
      threshold = (int)(newCapacity * loadFactor);
      // 创建新的数组
      HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity];
      // 新的掩码, 默认2扩容后是4, 4 - 1 = 3, 二进制就是11
      int sizeMask = newCapacity - 1;
      for (int i = 0; i < oldCapacity ; i++) {
      // 遍历老数组
      HashEntry<K,V> e = oldTable[i];
      if (e != null) {
      HashEntry<K,V> next = e.next;
      // 计算新的位置,新的位置只可能是不变或者是老的位置+老的容量
      int idx = e.hash & sizeMask;
      if (next == null)
      // 如果当前位置还不是链表,只是一个元素,直接赋值
      newTable[idx] = e;
      else {
      // 如果是链表了
      HashEntry<K,V> lastRun = e;
      int lastIdx = idx;
      // 新的位置只可能是不变或者是老的位置+老的容量
      // 遍历结束后,lastRun 后面的元素位置都是相同的
      for (HashEntry<K,V> last = next; last != null; last = last.next) {
      int k = last.hash & sizeMask;
                                    
      if (k != lastIdx) {
      lastIdx = k;
      lastRun = last;
      }
      }
      // lastRun 后面的元素位置都是相同的, 直接作为链表赋值到新位置
      newTable[lastIdx] = lastRun;
      // 遍历剩余元素,头插法到指定 k 位置
      for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
      V v = p.value;
      int h = p.hash;
      int k = h & sizeMask;
      HashEntry<K,V> n = newTable[k];
      newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
      }
      }
      }
      }
        
      // 头插法插入新的节点
      int nodeIndex = node.hash & sizeMask;
      node.setNext(newTable[nodeIndex]);
      newTable[nodeIndex] = node;
      table = newTable;
      }

      扩容只会扩容到原来的两倍。老数组里的数据移动到新的数组时,位置要么不变,要么变为 index+ oldSize,参数里的 node 会在扩容之后使用链表头插法插入到指定位置。
      源码种的第一个 for 是为了寻找这样一个节点,这个节点后面的所有 next 节点的新位置都是相同的,然后把这个作为一个链表赋值到新位置。第二个 for 循环是为了把剩余的元素通过头插法插入到指定位置链表。

    5. get方法

      public V get(Object key) {
      Segment<K,V> s;
      HashEntry<K,V>[] tab;
      int h = hash(key);
      // 得到segment索引
      long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
      if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
      (tab = s.table) != null) {
      // 得到在HashEntry数组中的索引
      for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
      (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
      e != null; e = e.next) {
      // 如果是链表, 遍历查找到相同 key 的 value
      K k;
      if ((k = e.key) == key || (e.hash == h && key.equals(k)))
      return e.value;
      }
      }
      return null;
      }

      get方法的具体流程如下:

      1. 计算得到 key 的存放位置;
      2. 遍历指定位置查找相同 key 的 value 值。
        get方法无需加锁,由于其中涉及到的共享变量都使用volatile修饰,volatile可以保证内存可见性,所以不会读取到过期数据。
  2. JDK1.8 源码
    Java8 中的 ConcruuentHashMap 使用的 Synchronized 锁加 CAS 的机制。结构也由 Java7 中的 Segment 数组 + HashEntry 数组 + 链表 进化成了 Node 数组 + 链表 / 红黑树,Node 是类似于一个 HashEntry 的结构。它的冲突再达到一定大小时会转化成红黑树,在冲突小于一定数量时又退回链表。
    源码分析!挖坑~ 现在太累了/(ㄒoㄒ)/~~ 都晚上12点了都!

参考资料

[1] https://github.com/Snailclimb/JavaGuide/blob/master/docs/java/collection/ConcurrentHashMap源码+底层数据结构分析.md
[2] https://www.cnblogs.com/chengxiao/p/6842045.html

Author: HB
Link: http://www.huangbin.fun/J-U-C之并发容器-CopyOnWriteArrayList-ConcurrentHashMap.html
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.