java常见并发容器

转载自https://github.com/Snailclimb/JavaGuide (添加小部分笔记)感谢作者!

JDK提供的容器,大部分在java.util.concurrent包中

  • ConcurrentHashMap:线程安全的HashMap
  • CopyOnWriteArrayList:线程安全的List,在读多写少的场合性能非常好,远好于Vector
  • ConcurrentLinkedQueue:高效的并发队列,使用链表实现,可以看作一个线程安全的LinkedList,是一个非阻塞队列
  • BlockingQueue:这是一个接口,JDK内部通过链表、数组等方式实现了该接口。表示阻塞队列,非常适合用于作为数据共享的通道
  • ConcorrentSkipListMap:跳表的实现,是一个Map,使用跳表的数据结构进行快速查找

ConcurrentHashMap #

  • HashMap是线程不安全的,并发场景下要保证线程安全,可以使用Collections.synchronizedMap()方法来包装HashMap,但这是通过使用一个全局的锁同步不同线程间的并发访问,因此会带来性能问题
  • 建议使用ConcurrentHashMap,不论是读操作还是写操作都能保证高性能:读操作(几乎)不需要加锁,而写操作时通过锁分段(这里说的是JDK1.7?)技术,只对所操作的段加锁而不影响客户端对其他段的访问

CopyOnWriteArrayList #

//源码
public class CopyOnWriteArrayList<E>
extends Object
implements List<E>, RandomAccess, Cloneable, Serializable
 
  • 在很多应用场景中,读操作可能会远远大于写操作
  • 我们应该允许多个线程同时访问List内部数据(针对读)
  • 与ReentrantReadWriteLock读写锁思想非常类似,即读读共享写写互斥读写互斥写读互斥
  • 不一样的是,CopyOnWriteArrayList读取时完全不需要加锁,且写入也不会阻塞读取操作,只有写入和写入之间需要同步等待

CopyOnWriteArrayList是如何做到的 #

  • CopyOnWriteArrayList 类的所有可变操作(add,set 等等)都是通过创建底层数组的新副本来实现的。当 List 需要被修改的时候,并不修改原有内容,而是对原有数据进行一次复制,将修改的内容写入副本。写完之后,再将修改完的副本替换原来的数据,这样就可以保证写操作不会影响读操作了。
  • CopyOnWriteArrayList 的名字就能看出 CopyOnWriteArrayList 是满足 CopyOnWrite
  • 在计算机,如果你想要对一块内存进行修改时,我们不在原有内存块中进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后呢,就将指向原来内存指针指向新的内存(注意,是指向,而不是重新拷贝★重要★),原来的内存就可以被回收掉了

CopyOnWriteArrayList 读取和写入源码简单分析 #

  • CopyOnWriteArrayList读取操作的实现 读取操作没有任何同步控制操作,理由就是内部数组array不会发生修改,只会被另一个array替换,因此可以保证数据安全

      /** The array, accessed only via getArray/setArray. */
        private transient volatile Object[] array;
        public E get(int index) {
            return get(getArray(), index);
        }
        @SuppressWarnings("unchecked")
        private E get(Object[] a, int index) {
            return (E) a[index];
        }
        final Object[] getArray() {
            return array;
        }
    
  • CopyOnWriteArrayList写入操作的实现 在添加集合的时候加了锁,保证同步,避免多线程写的时候会copy出多个副本

    /**
         * Appends the specified element to the end of this list.
         *
         * @param e element to be appended to this list
         * @return {@code true} (as specified by {@link Collection#add})
         */
        public boolean add(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();//加锁
            try {
                Object[] elements = getArray();
                int len = elements.length;
                Object[] newElements = Arrays.copyOf(elements, len + 1);//拷贝新数组
                newElements[len] = e;
                setArray(newElements);
                return true;
            } finally {
                lock.unlock();//释放锁
            }
        }
    

ConcurrentLinkedQueue #

  • Java提供的线程安全的Queue分为阻塞队列非阻塞队列
  • 阻塞队列的典型例子是BlockingQueue非阻塞队列的典型例子是ConcurrentLinkedQueue
  • 阻塞队列通过锁来实现,非阻塞队列通过CAS实现
  • ConcurrentLinkedQueue使用链表作为数据结构,是高并发环境中性能最好的队列
  • ConcurrentLinkedQueue 适合在对性能要求相对较高同时对队列的读写存在多个线程同时进行的场景,即如果对队列加锁的成本较高则适合使用无锁的 ConcurrentLinkedQueue,即CAS 来替代

BlockingQueue #

阻塞队列(BlockingQueue)被广泛使用在“生产者-消费者”问题中,其原因是 BlockingQueue 提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止

BlockingQueue是一个接口,继承自Queue,而Queue又继承自Collection接口,下面是BlockingQueue的相关实现类
ly-20241212141943569

代码例子(主要是**put()take()**两个方法):

public class TestBlockingQueue {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue
                = new ArrayBlockingQueue<>(2);
        for (int i = 10; i < 20; i++) {
            int finalI = i;
            new Thread(() -> {
                try {
                    TimeUnit.SECONDS.sleep(finalI);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    blockingQueue.put(finalI + "");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread()
                        .getName() + "放入了元素[" + finalI + "");
            }, "线程" + i).start();
        }
        for (int i = 20; i < 30; i++) {
            int finalI = i;
            new Thread(() -> {
                try {
                    TimeUnit.SECONDS.sleep(finalI);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String remove = null;
                try {
                    remove = blockingQueue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread()
                        .getName() + "取出了元素[" + remove + "");
            }, "线程" + i).start();
        }
    }
}
/* 由下可以知道,放入了两个元素之后,需要等待取出后,才能继续放入
线程10放入了元素[10
线程11放入了元素[11                 ----> 之后这里发生了停顿
线程20取出了元素[10
线程12放入了元素[12
线程21取出了元素[11
线程13放入了元素[13
线程22取出了元素[12
线程14放入了元素[14
线程23取出了元素[13
线程15放入了元素[15
线程24取出了元素[14
线程16放入了元素[16
线程25取出了元素[15
线程17放入了元素[17
线程26取出了元素[16
线程18放入了元素[18
线程27取出了元素[17
线程19放入了元素[19
线程28取出了元素[18
线程29取出了元素[19

Process finished with exit code 0

*/

ArrayBockingQueue #

  • ArrayBlockingQueue是BlockingQueue接口的有界队列实现类,底层采用数组来实现

    public class ArrayBlockingQueue<E>
    extends AbstractQueue<E>
    implements BlockingQueue<E>, Serializable{}
    
  • ArrayBlockingQueue 一旦创建,容量不能改变。其并发控制采用可重入锁 ReentrantLock ,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞;尝试从一个空队列中取一个元素也会同样阻塞

  • ArrayBlockingQueue 默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到 ArrayBlockingQueue。而非公平性则是指访问 ArrayBlockingQueue 的顺序不是遵守严格的时间顺序,有可能存在,当 ArrayBlockingQueue 可以被访问时,长时间阻塞的线程依然无法访问到 ArrayBlockingQueue。如果保证公平性,通常会降低吞吐量。如果需要获得公平性的 ArrayBlockingQueue,可采用如下代码(主要是第二个参数)

    private static ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10,true);
    

LinkedBlockingQueue #

  • 底层基于单向链表实现阻塞队列,可以当作无界队列也可以当作有界队列

  • 满足FIFO特性,与ArrayBlockingQueue相比有更高吞吐量,为防止LinkedBlockingQueue容量迅速增加,损耗大量内存,一般创建LinkedBlockingQueue对象时会指定大小****;如果未指定则容量等于Integer.MAX_VALUE

  • 相关构造方法

    /**
         *某种意义上的无界队列
         * Creates a {@code LinkedBlockingQueue} with a capacity of
         * {@link Integer#MAX_VALUE}.
         */
        public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }
    
        /**
         *有界队列
         * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
         *
         * @param capacity the capacity of this queue
         * @throws IllegalArgumentException if {@code capacity} is not greater
         *         than zero
         */
        public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            last = head = new Node<E>(null);
        }
    

PriorityBlockingQueue #

  • 支持优先级的无界阻塞队列,默认情况元素采用自然顺序进行排序,或通过自定义类实现compareTo()方法指定元素排序,或初始化时通过构造器参数Comparator来指定排序规则
  • PriorityBlockingQueue 并发控制采用的是可重入锁 ReentrantLock队列为无界队列ArrayBlockingQueue 是有界队列,LinkedBlockingQueue 也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容
  • 它就是 PriorityQueue 的线程安全版本不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。它的插入操作 put 方法不会 block(是block 阻塞,不是lock 锁),因为它是无界队列(take 方法在队列为空的时候会阻塞)

ConcurrentSkipListMap #

对于一个单链表,即使链表是有序的,如果我们想要在其中查找某个数据,也只能从头到尾遍历链表,这样效率自然就会很低,跳表就不一样了。跳表是一种可以用来快速查找的数据结构,有点类似于平衡树。它们都可以对元素进行快速的查找。但一个重要的区别是:对平衡树的插入和删除往往很可能导致平衡树进行一次全局的调整。而对跳表的插入和删除只需要对整个数据结构的局部进行操作即可。这样带来的好处是:在高并发的情况下,你会需要一个全局锁来保证整个平衡树的线程安全。而对于跳表,你只需要部分锁即可。这样,在高并发环境下,你就可以拥有更好的性能。而就查询的性能而言,跳表的时间复杂度也是 O(logn) 所以在并发数据结构中,JDK 使用跳表来实现一个 Map。

跳表的本质是维护多个链表,且链表是分层ly-20241212141943852

  • 最低层的链表维护跳表内所有元素,每上面一层链表都是下面一层的子集
  • 跳表内所有链表的元素都是排序
  • 查找时,可以从顶级链表开始找。一旦发现被查找的元素大于当前链表中的取值(这里应该加上一句,小于前一个节点,比如下面如果是查找3,那么就从1跳下去),就会转入下一层链表继续找。这也就是说在查找过程中,搜索是跳跃式的。如上图所示,在跳表中查找元素 18。

ly-20241212141944002 查找 18 的时候原来需要遍历 18 次,现在只需要 7 次即可。针对链表长度比较大的时候,构建索引查找效率的提升就会非常明显 (这里好像不太对,原来也不需要遍历18次,反正大概率是说效率高就是了

使用跳表实现 Map 和使用哈希算法实现 Map 的另外一个不同之处是:哈希并不会保存元素的顺序,而跳表内所有的元素都是排序的。因此在对跳表进行遍历时,你会得到一个有序的结果。所以,如果你的应用需要有序性,那么跳表就是你不二的选择。JDK 中实现这一数据结构的类是 ConcurrentSkipListMap