多线程与高并发编程系列(六)之容器发展和使用方式

2021/03/18 JUC 共 8498 字,约 25 分钟

本篇将会着重介绍容器的发展历史,讲解部分容器的源码以及介绍容器的使用方法和使用场景======

容器整体介绍

容器分类

image

  • Java的容器从大方向来说分为两类,分别是Collection和Map,Collection又分为List、Set、Queue。

容器的发展史

  • 最开始的时候,Java的容器有两种,一个是Vector,一个是HashTable;Vector是list的前身,HashTable是Map的前身

  • HashTable和Vector作为一代的容器,容器中几乎所有的方法都是加上了sync方法的

    image

  • 我们知道,初代的Sync的效率非常低,所以为了解决这个问题,sun公司提供了HashMap方法

  • HashMap方法是一个完全没有加上锁的方法,其余的实现基本没有太大区别

  • 可能是觉得HashMap完全没有锁不太好,于是sun公司提供了一个方法可以将HashMap转换为线程安全的容器

    image

    image

  • SynchronizedMap和HashTable最大的区别就在于SynchronizedMap的锁更加细化,我们可以看到,SynchronizedMap的里面添加了一个mutex对象,这个对象用于作为锁,每一次上锁的时候不需要将整个方法给锁上,只需要锁上mutex对象即可

  • 后来随着高并发场景的增加,还出现了适用于高并发场景的容器ConcurrentHashMap,这个时间段也是Queue出现的时候,Queue同时也是为了高并发场景设计的容器

高并发场景下线程安全容器的效率比较

  • 我们将对HashTable、SynchronizedHashMap、ConcurrentHashMap的效率进行比较,我们启动100个线程,每一个线程往容器中 put 一万个对象;然后100个线程启动,每一个线程将容器中的第10个取10000000遍

  • HashTable效率

    public class Effective_HashTable {
        static Hashtable<UUID, UUID> m = new Hashtable<>();
      
        static int count = Constants.COUNT;
        static UUID[] keys = new UUID[count];
        static UUID[] values = new UUID[count];
        static final int THREAD_COUNT = Constants.THREAD_COUNT;
      
        static {
            for (int i = 0; i < count; i++) {
                keys[i] = UUID.randomUUID();
                values[i] = UUID.randomUUID();
            }
        }
      
        static class MyThread extends Thread {
            int start;
            int gap = count/THREAD_COUNT;
      
            public MyThread(int start) {
                this.start = start;
            }
      
            @Override
            public void run() {
                for(int i=start; i<start+gap; i++) {
                    m.put(keys[i], values[i]);
                }
            }
        }
      
        public static void main(String[] args) {
      
            long start = System.currentTimeMillis();
      
            Thread[] threads = new Thread[THREAD_COUNT];
      
            for(int i=0; i<threads.length; i++) {
                threads[i] =
                        new MyThread(i * (count/THREAD_COUNT));
            }
      
            for(Thread t : threads) {
                t.start();
            }
      
            for(Thread t : threads) {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
      
            long end = System.currentTimeMillis();
            System.out.println(end - start);
      
            System.out.println(m.size());
      
            //-----------------------------------
      
            start = System.currentTimeMillis();
            for (int i = 0; i < threads.length; i++) {
                threads[i] = new Thread(()->{
                    for (int j = 0; j < 10000000; j++) {
                        m.get(keys[10]);
                    }
                });
            }
      
            for(Thread t : threads) {
                t.start();
            }
      
            for(Thread t : threads) {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
      
            end = System.currentTimeMillis();
            System.out.println(end - start);
        }
    }
    

    image

    我们可以看到,往HashTable容器中put消耗了0.35秒,但是get花费了41.5秒

  • SynchronizedHashMap效率

    public class Effective_SynchronizedHashMap {
        static Map<UUID, UUID> m = Collections.synchronizedMap(new HashMap<UUID, UUID>());
      
        static int count = Constants.COUNT;
        static UUID[] keys = new UUID[count];
        static UUID[] values = new UUID[count];
        static final int THREAD_COUNT = Constants.THREAD_COUNT;
      
        static {
            for (int i = 0; i < count; i++) {
                keys[i] = UUID.randomUUID();
                values[i] = UUID.randomUUID();
            }
        }
      
        static class MyThread extends Thread {
            int start;
            int gap = count/THREAD_COUNT;
      
            public MyThread(int start) {
                this.start = start;
            }
      
            @Override
            public void run() {
                for(int i=start; i<start+gap; i++) {
                    m.put(keys[i], values[i]);
                }
            }
        }
      
        public static void main(String[] args) {
      
            long start = System.currentTimeMillis();
      
            Thread[] threads = new Thread[THREAD_COUNT];
      
            for(int i=0; i<threads.length; i++) {
                threads[i] =
                        new MyThread(i * (count/THREAD_COUNT));
            }
      
            for(Thread t : threads) {
                t.start();
            }
      
            for(Thread t : threads) {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
      
            long end = System.currentTimeMillis();
            System.out.println(end - start);
      
            System.out.println(m.size());
      
            //-----------------------------------
      
            start = System.currentTimeMillis();
            for (int i = 0; i < threads.length; i++) {
                threads[i] = new Thread(()->{
                    for (int j = 0; j < 10000000; j++) {
                        m.get(keys[10]);
                    }
                });
            }
      
            for(Thread t : threads) {
                t.start();
            }
      
            for(Thread t : threads) {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
      
            end = System.currentTimeMillis();
            System.out.println(end - start);
        }
    }
    

    image

    我们发现,SynchronizedHashMap的效率和HashTable的差距不是特别大

  • ConcurrentHashMap效率

    public class Effective_ConcurrentHashMap {
        static Map<UUID, UUID> m = new ConcurrentHashMap<>();
      
        static int count = Constants.COUNT;
        static UUID[] keys = new UUID[count];
        static UUID[] values = new UUID[count];
        static final int THREAD_COUNT = Constants.THREAD_COUNT;
      
        static {
            for (int i = 0; i < count; i++) {
                keys[i] = UUID.randomUUID();
                values[i] = UUID.randomUUID();
            }
        }
      
        static class MyThread extends Thread {
            int start;
            int gap = count/THREAD_COUNT;
      
            public MyThread(int start) {
                this.start = start;
            }
      
            @Override
            public void run() {
                for(int i=start; i<start+gap; i++) {
                    m.put(keys[i], values[i]);
                }
            }
        }
      
        public static void main(String[] args) {
      
            long start = System.currentTimeMillis();
      
            Thread[] threads = new Thread[THREAD_COUNT];
      
            for(int i=0; i<threads.length; i++) {
                threads[i] =
                        new MyThread(i * (count/THREAD_COUNT));
            }
      
            for(Thread t : threads) {
                t.start();
            }
      
            for(Thread t : threads) {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
      
            long end = System.currentTimeMillis();
            System.out.println(end - start);
      
            System.out.println(m.size());
      
            //-----------------------------------
      
            start = System.currentTimeMillis();
            for (int i = 0; i < threads.length; i++) {
                threads[i] = new Thread(()->{
                    for (int j = 0; j < 10000000; j++) {
                        m.get(keys[10]);
                    }
                });
            }
      
            for(Thread t : threads) {
                t.start();
            }
      
            for(Thread t : threads) {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
      
            end = System.currentTimeMillis();
            System.out.println(end - start);
        }
    }
    

    image

    我们可以发现,ConcurrentHashMap的插入效率比起HashTable和HashMap的效率要低,但是查询的速度却是大大提高了

  • 这就是为什么我们说ConcurrentHashMap是专门为多线程生产环境准备的map,因为我们可以明显感受到ConcurrentHashMap在读数据上的巨大优势

容器的介绍之Queue

ConcurrentLinkedQueue

  • 我们借由ConcurrentLinkedQueue来介绍Queue的基本方法

  • add()

    i:一开始,add方法就是向queue中添加对象,但是如果添加失败的话就会抛出异常,这种方式非常不好用,所以现在很多的queue已经没有了真正意义上的add方法,而是改成了另一个方法

    image

  • offer()

    i:offer方法是一个替代add方法的方法,这个方法改进的地方就是添加的结果会返回一个boolean值,从而替代以前的add方法

    image

  • peek()

    i:peek方法顾名思义就是看一眼队列头部的对象

  • remove()

    i:remove方法是一个重载方法,你可以传入一个想要remove掉的对象来remove,或者直接remove队列头部的对象

  • poll()

    i:poll方法就是将对象头部的对象直接取出并删除

  • ConcurrentLinkedQueue方法是一个保证了线程安全的容器,这个容器的线程安全是依靠CAS保证的,因为是LinkedQueue,这个容器的容量是没有上限的

ArrayBlockingQueue

  • ArrayBlockingQueue在构造的时候必须先给一个容器的容量上限,当添加的对象数量达到了容器容量上限时就会添加失败这就是arrayQueue的特点,并且容器一旦创建就无法扩容,ArrayBlockingQueue比起一般Queue的差距主要就在于Blocking上,为了体现这个Blocking,这里提供了两个新方法,一个是put,一个是take

  • put()

    put方法是一个阻塞方法,当队列中没有空位的时候,尝试put的线程就会阻塞住,直到put成功为止

  • take()

    take方法也是一个阻塞方法,当队列中没有对象的时候,尝试take的线程就会阻塞住,直到take成功为止

DelayQueue

  • DelayQueue是一个BlockingQueue,你可以让队列按照你自己定义的优先级来排序,然后插入队列中,本质上就是一个PriorityQueue

    public class DelayQueue_Test {
        static BlockingQueue<MyTask> tasks = new DelayQueue<>();
      
        static Random r = new Random();
      
        static class MyTask implements Delayed {
            String name;
            long runningTime;
      
            MyTask(String name, long rt) {
                this.name = name;
                this.runningTime = rt;
            }
      
            @Override
            public int compareTo(Delayed o) {
                if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
                    return -1;
                else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
                    return 1;
                else
                    return 0;
            }
      
            @Override
            public long getDelay(TimeUnit unit) {
      
                return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            }
      
      
            @Override
            public String toString() {
                return name + " " + runningTime;
            }
        }
      
        public static void main(String[] args) throws InterruptedException {
            long now = System.currentTimeMillis();
            MyTask t1 = new MyTask("t1", now + 1000);
            MyTask t2 = new MyTask("t2", now + 2000);
            MyTask t3 = new MyTask("t3", now + 1500);
            MyTask t4 = new MyTask("t4", now + 2500);
            MyTask t5 = new MyTask("t5", now + 500);
      
            tasks.put(t1);
            tasks.put(t2);
            tasks.put(t3);
            tasks.put(t4);
            tasks.put(t5);
      
            System.out.println(tasks);
      
            for(int i=0; i<5; i++) {
                System.out.println(tasks.take());
            }
        }
    }
    

    image

SynchronousQueue

  • 这个容器的容量为0,当你往容器中put对象的时候,线程就阻塞住了,直到另外一个线程过来take,这个容器起到的作用和Exchanger非常类似

    public class SynchronousQueue_Test {
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue<String> strs = new SynchronousQueue<>();
      
            new Thread(()->{
                  
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
      
                try {
                    System.out.println(strs.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
      
            strs.put("aaa"); //阻塞等待消费者消费
            //strs.put("bbb");
            //strs.add("aaa");
            System.out.println(strs.size());
        }
    }
    

TransferQueue

  • TransferQueue和像是BlockQueue和SynchronousQueue的合体,既可以存储对象,又可以像Exchanger一样 “手把手” 将对象交给其他线程

容器源码分析之HashMap

  • 对于HashMap的源码,我们都非常熟悉,我这里就大致略过简单的部分,着重讲解核心的部分

HashMap构造

  1. 当HashMap构造的时候,实际上并没有开辟专门存放节点(Node)的数组,而是在第一次插入的时候创建

HashMap.put

put方法是HashMap的核心方法,理解了put就理解了HashMap

  1. put方法在执行的时候,首先会判断table是否初始化,如果没有初始化的话就会优先初始化。初始化的结果是得到一个数组长度为16的Node[]数组,临界点为数组长度的3/4

    image

  2. map在往数组中放置节点的时候,并不是挨着放置,而是使用了一个简单的算法:key.hashcode & (table.length - 1)。这个算法非常有意思,首先,table.length - 1是一个很有趣的数字,因为table.length永远会是16的2、4、8、16……倍(table没有初始化时table没有length,所以不算入),这样的一个数字在-1后,这个数的二进制码一定是末尾有一连串的1

    image

    这样的数字和一个 int 的值取 & 操作后,就只会留下最高位不超过 table.length-1 的值,这个值就可以直接作为table[ ]的index,将这个node节点直接存入,效率极其高

HashMap解决哈希碰撞

  1. 当碰撞的哈希不超过8个时,所有的对象都是被吊在一串链表上面的,当碰撞的hash超过8时,就会将链表结构转变为红黑树结构;如果已经变成了红黑树,那么只有当碰撞数量少于7时才会退化为链表结构

HashMap.resize

  1. HashMap的扩容临界点是超过了当前的容量最大值的 3 / 4

  2. HashMap的扩容策略是直接将原本的容量和临界点左移1位,也就是直接扩大为原来的2倍,如果达到了INTEGER.MAX_VALUE,那么就不会再扩大容量,而是将临界点扩大到INTEGER.MAX_VALUE

  3. resize中最重要的点就是如何将原来的数据移植到新的Node[ ],如果没有hash碰撞的话,那么解决方法就是直接rehash一下,rehash的值就作为新的index

    image

    如果有哈希碰撞的话,那么首先判断是不是红黑树结构,如果是,那么就将树分为两个树,这个算法不详细讲,因为遇到的不多

    image

    如果是链表结构,那么就会进入一个非常有意思的算法

    image

    image

    image

文档信息

Search

    Table of Contents