面试官: Exchanger在项目中有使用过吗?说说看(源码剖析)

前言

目前正在出一个Java多线程专题长期系列教程,从入门到进阶含源码解读, 篇幅会较多, 喜欢的话,给个关注❤️ ~

Java提供了一些非常好用的并发工具类,不需要我们重复造轮子,本节我们讲解Exchanger,一起来看下吧~

Exchanger

Exchanger类用于两个线程交换数据,支持泛型,下面我们通过例子感受一下:

    private final Object slotExchange(Object item, boolean timed, long ns) {
        // 获取当前线程的交换节点
        Node p = participant.get();
        Thread t = Thread.currentThread();
        if (t.isInterrupted()) // preserve interrupt status so caller can recheck
            return null;

        // 常规自旋操作
        for (Node q;;) {
            // 如果 q不为空 说明q已经被占了
            if ((q = slot) != null) {
                if (U.compareAndSwapObject(this, SLOT, q, null)) {
                    // 获取交换值
                    Object v = q.item;
                    // 设置交换值
                    q.match = item;
                    Thread w = q.parked;
                    // 唤醒等待线程
                    if (w != null)
                        U.unpark(w);
                    // 交换成功返回结果    
                    return v;
                }

                // CPU核数数多于1个, 且bound为0时创建arena数组,并将bound设置为SEQ大小
                if (NCPU > 1 && bound == 0 &&
                    U.compareAndSwapInt(this, BOUND, 0, SEQ))
                    arena = new Node[(FULL + 2) << ASHIFT];
            }
            // 重定向到多槽交换 arenaExchange
            else if (arena != null)
                return null// caller must reroute to arenaExchange
            else {
                // 占用slot
                p.item = item;
                if (U.compareAndSwapObject(this, SLOT, null, p))
                    break;
                // 失败设置为null 进入下一次自旋   
                p.item = null;
            }
        }

        // 下面的操作主要是匹配等待接收的线程
        int h = p.hash;
        long end = timed ? System.nanoTime() + ns : 0L;
        // 自旋的次数
        int spins = (NCPU > 1) ? SPINS : 1;
        Object v;

        // 匹配线程未到 就进入自旋
        while ((v = p.match) == null) {
            if (spins > 0) {
                // 优化操作
                h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
                if (h == 0)
                    h = SPINS | (int)t.getId();
                else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                    Thread.yield();
            }
            // 说明匹配线程已找到 但是还未完全准备好
            else if (slot != p)
                spins = SPINS;
            // 自选时间过长 还未匹配到进入阻塞,常规操作
            else if (!t.isInterrupted() && arena == null &&
                     (!timed || (ns = end - System.nanoTime()) > 0L)) {
                U.putObject(t, BLOCKER, this);
                p.parked = t;
                if (slot == p)
                    U.park(false, ns);
                p.parked = null;
                U.putObject(t, BLOCKER, null);
            }
            // 超时 让出给其它线程
            else if (U.compareAndSwapObject(this, SLOT, p, null)) {
                v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
                break;
            }
        }
        U.putOrderedObject(p, MATCH, null);
        p.item = null;
        p.hash = h;
        return v;
    }

arenaExchange

操作与slotExchange类似,但是它更复杂一点,需要通过index来命中,这里就不带大家看了,有兴趣的同学可以自己去看下

结束语

下节给大家讲下CountDownLatch ~

往期内容推荐

项目源码(源码已更新 欢迎star⭐️)

  • java-thread-all

  • 地址: https://github.com/qiuChengleiy/java-thread-all.git

推荐 SpringBoot & SpringCloud (源码已更新 欢迎star⭐️)


本篇文章来源于微信公众号: 程序员皮卡秋



微信扫描下方的二维码阅读本文

此作者没有提供个人介绍
最后更新于 2023-06-27