CHANSHIYU
GITHUBZERO
  • README
  • 時雨
    • 2017
      • 01 网站动态标题的两种方式
      • 02 RN App 外部唤醒踩坑记
    • 2018
      • 01 不一样の烟火
      • 02 Python 之禅
      • 03 Python 文件操作
    • 2019
      • 01 Aurora 食用指南
      • 02 Godaddy 域名找回记事
      • 03 一个接口的诞生
      • 04 SpringMVC 前后端传参协调
      • 05 主题集成友链访问统计
      • 06 Github Style 博客主题
      • 07 字符编码の小常识
      • 08 WSL 安装 Docker 实录
      • 09 Eriri comic reader
      • 10 Aurora 2.0
      • 11 jsDelivr 全站托管
      • 12 两年工作台变迁史
      • 13 春物
      • 14 一种优雅の笔记方式
    • 2020
      • 01 Telegram 电报机器人
      • 02 她的眼里有星辰
      • 03 文心雕龙
      • 04 软萌木子の有趣笔谈
      • 05 Telegram RSS 订阅频道
      • 06 水月雨银色飞船
      • 07 五年前旧照
    • 2021
      • 01 春宵苦短 2020
      • 02 风花
    • 2022
      • 01 小城新貌
      • 02 原神满级纪念
    • 2023
      • 01 2022 逆旅
      • 02 半透明背景图实现
      • 03 新年攒台海景房
  • 前端
    • JavaScript
      • 01 JavaScript 秘密花园
      • 02 JavaScript 正则技巧
      • 03 从浏览器解析 JS 运行机制
      • 04 Canvas 基础用法
      • 05 Blob Url And Data Url
      • 06 函数节流与函数防抖
      • 07 排序算法初探
      • 08 洗牌算法实现数组乱序
      • 09 正则匹配 match 和 exec
      • 10 正则匹配汉字
      • 11 JSX.Element vs ReactElement
      • 12 可选链与空值合并
      • 13 TypeScript 编码规范
      • 14 Typescript 中 interface 和 type 区别
      • 15 TypeScript 高级类型
      • 16 TypeScript 关键字
      • 17 TypeScript 映射类型
    • CSS
      • 01 Flex 弹性布局
      • 02 Position 定位
      • 03 CSS 逻辑属性
    • Node
      • 01 Node Tips
      • 02 七天学会 NodeJS
    • Note
      • 01 Note
      • 02 Code
      • 03 Snippets
      • 04 Git
    • React
      • 01 React Props Children 传值
      • 02 Use a Render Prop!
      • 03 React Hook
      • 04 React Hook 定时器
      • 05 Fetch data with React Hooks
      • 06 React 和 Vue 中 key 的作用
      • 07 useCallback 的正确使用方式
      • 08 useLayoutEffect 和 useEffect 的区别
      • 09 forwardRef 逃生舱
      • 10 React 条件渲染
    • Vue
      • 01 Vue Tips
      • 02 Vue 构建项目写入配置文件
      • 03 Vue 项目引入 SVG 图标
      • 04 Vue 一键导出 PDF
      • 05 动态可响应对象
      • 06 Vue 引入 SCSS
      • 07 Vue 路由权限控制
    • 实战系列
      • 01 WebSocket 心跳重连机制
      • 02 图片加解密二三事
      • 03 优雅实现 BackTop
      • 04 动态加载 JS 文件
      • 05 常用 DOM 方法比较
      • 06 AbortController 中断 fetch
      • 07 计算字符所占字节数
      • 08 Axios 自定义返回值类型
  • 后端
    • Java
      • 01 面向对象基本特征与原则
      • 02 Java 数据类型
      • 03 Java String
      • 04 Java 只有值传递
      • 05 Java final 与 static
      • 06 Java Object 通用方法
      • 07 Java 继承
      • 08 Java 反射
      • 09 Java 异常
      • 10 Java 容器
      • 11 Java 虚拟机
      • 12 Java IO
      • 13 Java HashMap
      • 14 Java List
      • 15 Java Stream
      • 16 Java 枚举
      • 17 Java 日期与时间
      • 18 Java fail fast
      • 19 Java BiFunction 和 BinaryOperator
    • 并发编程
      • 01 Java 并发
      • 02 synchronized
      • 03 volatile
      • 04 ReentrantLock
      • 05 ReadWriteLock
      • 06 StampedLock
      • 07 CompletableFuture
      • 08 ForkJoin
      • 09 ThreadLocal
      • 10 CountDownLatch
      • 11 ThreadPoolExecutor
      • 12 ExecutorService
      • 13 Atom 原子类
      • 14 BlockingQueue
    • 高效编程
      • 01 30 seconds of java8
      • 02 函数式替代 for 循环
      • 03 Java 字符串拼接
      • 04 单例模式的几种实现
      • 05 HashMap 排序
    • 理论概念
      • 01 Java Servlet
      • 02 Java 服务端分层模型
      • 03 经典排序算法
      • 04 LRU 缓存淘汰算法
      • 05 BloomFilter 判断元素存在
      • 06 Java HashMap 面试大全
      • 07 HTTP 状态码详解
      • 08 Cookie 和 Session
      • 09 基于消息队列的分布式事务解决方案
      • 10 微服务之所见
    • 实战系列
      • 01 AES CBC 加解密
      • 02 Magic 魔数获取文件类型
      • 03 获取请求 IP 地址
      • 04 Kaptcha 与数学公式验证码
      • 05 Netty 获取客户端 IP.md
      • 06 高性能无锁队列 Disruptor.md
      • 07 前后端接入阿里云盾
    • Linux
      • 01 Linux 文件权限系统
      • 02 Linux 常用软件安装
      • 03 CentOS 防火墙
    • MySQL
      • 01 MySQL
      • 02 SQL 语句 where 1=1
      • 03 truncate 和 delete
      • 04 事务
      • 05 关系模型
      • 06 Mybatis
      • 07 MySQL 查看数据库表详情
    • Nginx
      • 01 Nginx 指北
      • 02 nginx gzip 压缩
    • Note
      • 01 Vagrant
      • 02 Docker
      • 03 Lombok
      • 04 Swagger
      • 05 Redis
    • Spring
      • 01 Spring Boot
      • 02 Spring Validation
      • 03 Spring Data
      • 04 Spring 容器
      • 05 Spring AOP
      • 06 Spring Transactional 注解
      • 07 Spring Cloud Netflix
      • 08 Spring Cloud Alibaba
      • 09 Spring Security oAuth2
      • 10 Spring Boot 跨域解决方式
      • 11 Spring Boot 请求拦截
      • 12 Spring Boot 异步编程
      • 13 Spring Boot 定时任务
      • 14 Spring Boot 管理 bean
      • 15 Mybatis 逆向代码生成
      • 16 JWT
      • 17 JPA
      • 18 Apache Shiro
      • 19 Spring 异步请求
  • 书斋
    • ES6 标准入门
      • 01 变量声明与解构赋值
      • 02 语法的扩展
      • 03 数据类型与数据结构
      • 04 Proxy 和 Reflect
      • 05 异步编程 Promise
      • 06 Iterator 和 for of 循环
      • 07 Generator 函数
      • 08 Async 函数
      • 09 Class 类
    • JavaScript 设计模式
      • 01 基础知识
      • 02 设计模式(上)
      • 03 设计模式(下)
      • 04 设计原则和编程技巧
  • 纸函
    • 01 Interview
    • 02 Ceph
    • 03 动态规划
    • 04 Document.designMode
    • 2023-01-10
  • 万藏
    • 文档
      • 01 Git 文档
      • 02 Linux 命令大全
      • 03 七天学会 NodeJS
      • 04 Algorithms
    • 工具
      • 01 Nginx Config
      • 02 ProcessOn
      • 03 Flat Icon
      • 04 Regexper
      • 05 TempMail
      • 06 Carbon
由 GitBook 提供支持
在本页
  • 介绍
  • 消费者的等待策略
  • 食用方式
  • 引入依赖
  • 命令字和数据包
  • 轮询策略
  • 生成者和消费者
  • 构造工厂
  • 启动
  • 测试消息生产消费
  • 一些方案
  • 规避数据覆盖

这有帮助吗?

  1. 后端
  2. 实战系列

06 高性能无锁队列 Disruptor.md

上一页05 Netty 获取客户端 IP.md下一页07 前后端接入阿里云盾

最后更新于2年前

这有帮助吗?

Disruptor 是英国外汇交易公司 LMAX 开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题,因其出色的性能表现获得 2011 Duke’s 程序框架创新奖。

A High Performance Inter-Thread Messaging Library 项目地址:

介绍

从数据结构上来看,Disruptor 是一个支持生产者/消费者模式的环形队列。能够在无锁的条件下进行并行消费,也可以根据消费者之间的依赖关系进行先后消费次序。

Disruptor 高效原理:

  1. Disruptor 使用了一个 RingBuffer 替代队列,用生产者消费者指针替代锁。

  2. 生产者消费者指针使用 CPU 支持的整数自增,无需加锁并且速度很快。Java 的实现在 Unsafe package 中。

消费者的等待策略

名称
措施
适用场景

BlockingWaitStrategy

加锁

CPU 资源紧缺,吞吐量和延迟并不重要的场景

BusySpinWaitStrategy

自旋

通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的 CPU 的场景下使用

PhasedBackoffWaitStrategy

自旋 + yield + 自定义策略

CPU 资源紧缺,吞吐量和延迟并不重要的场景

SleepingWaitStrategy

自旋 + yield + sleep

性能和 CPU 资源之间有很好的折中。延迟不均匀

TimeoutBlockingWaitStrategy

加锁,有超时限制

CPU 资源紧缺,吞吐量和延迟并不重要的场景

YieldingWaitStrategy

自旋 + yield + 自旋

性能和 CPU 资源之间有很好的折中。延迟比较均匀

名称
适用场景

BlockingWaitStrategy

CPU 资源紧缺,吞吐量和延迟并不重要的场景

BusySpinWaitStrategy

通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的 CPU 的场景下使用

PhasedBackoffWaitStrategy

CPU 资源紧缺,吞吐量和延迟并不重要的场景

SleepingWaitStrategy

性能和 CPU 资源之间有很好的折中。延迟不均匀

TimeoutBlockingWaitStrategy

CPU 资源紧缺,吞吐量和延迟并不重要的场景

YieldingWaitStrategy

性能和 CPU 资源之间有很好的折中。延迟比较均匀

食用方式

引入依赖

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.2</version>
</dependency>

命令字和数据包

/**
 * @author SHIYU
 * @description 无锁队列命令字
 * @since 2020-06-13
 */
public interface IDisruptorCommand {

    /**
     * 测试消息 hello
     */
    int CHECK_MSG_HELLO = 1;

    /**
     * 测试消息 hi
     */
    int CHECK_MSG_HI = 2;

}
/**
 * @author SHIYU
 * @description 传输的数据
 * @since 2020-06-13
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TranslatorDataWrapper {

    private int command;

    private Object target;

}

轮询策略

/**
 * @author SHIYU
 * @description 轮询策略
 * @since 2020-06-13
 */
@Configuration
public class DisruptorWaitStrategyConfiguration {

    @Bean
    @ConditionalOnMissingBean(WaitStrategy.class)
    public WaitStrategy getWaitStrategy() {
        // 如果 CPU 比较叼的话,可以用 YieldingWaitStrategy
        return new BlockingWaitStrategy();
    }

}

生成者和消费者

/**
 * @author SHIYU
 * @description 消息生产者
 * @since 2020-06-13
 */
@Data
@Slf4j
@AllArgsConstructor
public class MessageProducer {

    private RingBuffer<TranslatorDataWrapper> ringBuffer;

    /**
     * 发布事件
     *
     * @param command 命令字
     * @param object 数据
     */
    public void publish(int command, Object object) {
        long sequence = ringBuffer.next();
        try {
            TranslatorDataWrapper wrapper = ringBuffer.get(sequence);
            wrapper.setCommand(command);
            wrapper.setTarget(object);
        } finally {
            ringBuffer.publish(sequence);
        }
    }

}
/**
 * 消息消费者
 *
 * @author nk
 */
@Slf4j
public class MessageConsumer implements WorkHandler<TranslatorDataWrapper> {

    @Override
    public void onEvent(TranslatorDataWrapper wrapper) {
        int command = wrapper.getCommand();
        switch (command) {
            case IDisruptorCommand.CHECK_MSG_HELLO:
                log.info("消费消息 =============== hello");
                break;
            case IDisruptorCommand.CHECK_MSG_HI:
                log.info("消费消息 =============== hi");
                break;
            default:
                break;
        }
    }

}

构造工厂

disruptor.buffer.size 这里设置为 1024 * 1024 即 1048576。

disruptor:
  buffer:
    size: 1048576
/**
 * @author SHIYU
 * @description 环型无锁队列
 * @since 2020-06-13
 */
@Slf4j
@Component
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class RingBufferWorkerPoolFactory {

    @Value("${disruptor.buffer.size}")
    private int mBufferSize;

    private final WaitStrategy mWaitStrategy;

    private Map<Integer, MessageProducer> producers = new ConcurrentHashMap<>();

    private RingBuffer<TranslatorDataWrapper> ringBuffer;

    public void initAndStart(MessageConsumer[] messageConsumers) {
        // 1.构建 ringBuffer 对象
        this.ringBuffer = RingBuffer.create(ProducerType.MULTI,
                TranslatorDataWrapper::new,
                mBufferSize,
                mWaitStrategy);
        // 2.通过 ringBuffer 创建一个屏障
        SequenceBarrier sequenceBarrier = this.ringBuffer.newBarrier();
        // 3.创建多个消费者数组
        WorkerPool<TranslatorDataWrapper> workerPool = new WorkerPool<>(
                this.ringBuffer,
                sequenceBarrier,
                new EventExceptionHandler(),
                messageConsumers);
        // 4.设置多个消费者的 sequence 序号,用于单独统计消费进度,并且设置到 ringBuffer 中
        this.ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
        // 5.启动工作池
        int processorsCount = Runtime.getRuntime().availableProcessors();
        log.info("进程数 -> {}", processorsCount);
        workerPool.start(Executors.newFixedThreadPool(processorsCount));
    }

    public MessageProducer getMessageProducer(int command) {
        MessageProducer messageProducer = producers.get(command);
        if (messageProducer == null) {
            messageProducer = new MessageProducer(this.ringBuffer);
            producers.put(command, messageProducer);
        }
        return messageProducer;
    }

    /**
     * 异常静态类
     */
    @Slf4j
    static class EventExceptionHandler implements ExceptionHandler<TranslatorDataWrapper> {

        @Override
        public void handleEventException(Throwable ex, long sequence, TranslatorDataWrapper event) {
            log.error("handleEventException -> ex:{}  sequence:{} event:{}", ex.getMessage(), sequence, event.getClass().toString());
            ex.printStackTrace();
        }

        @Override
        public void handleOnStartException(Throwable ex) {
            log.error("handleOnStartException -> ex:{}", ex.getMessage());
            ex.printStackTrace();
        }

        @Override
        public void handleOnShutdownException(Throwable ex) {
            log.error("handleOnShutdownException -> ex:{} ", ex.getMessage());
            ex.printStackTrace();
        }
    }

}
public Disruptor(
        final EventFactory<T> eventFactory,
        final int ringBufferSize,
        final ThreadFactory threadFactory,
        final ProducerType producerType,
        final WaitStrategy waitStrategy)
{
    this(
        RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
        new BasicExecutor(threadFactory));
}
  • eventFactory:在环形缓冲区中创建事件的 factory;

  • ringBufferSize:环形缓冲区的大小,必须是 2 的幂;

  • threadFactory:用于为处理器创建线程;

  • producerType:生成器类型以支持使用正确的 sequencer 和 publisher 创建 RingBuffer;枚举类型,SINGLE、MULTI 两个项。对应于 SingleProducerSequencer 和 MultiProducerSequencer 两种 Sequencer;

  • waitStrategy:等待策略;

启动

public static void main(String[] args) {
    SpringApplication.run(YukoApplication.class, args);

    // 启动 disruptor
    MessageConsumer[] consumers = new MessageConsumer[8];
    for (int i = 0; i < consumers.length; i++) {
        MessageConsumer messageConsumer = new MessageConsumer();
        consumers[i] = messageConsumer;
    }
    RingBufferWorkerPoolFactory factory = SpringUtil.getBean(RingBufferWorkerPoolFactory.class);
    factory.initAndStart(consumers);
}

测试消息生产消费

private RingBufferWorkerPoolFactory getWorkerPoolFactory() {
    return SpringUtil.getBean(RingBufferWorkerPoolFactory.class);
}

@Scheduled(fixedDelay = 1000, initialDelay = 3000)
private void msg() {
    IntStream.range(1, 9).forEach(i -> {
        int command = i % 2 == 0 ? IDisruptorCommand.CHECK_MSG_HELLO : IDisruptorCommand.CHECK_MSG_HI;
        TranslatorDataWrapper wrapper = new TranslatorDataWrapper(command, "WORLD");
        MessageProducer messageProducer = getWorkerPoolFactory().getMessageProducer(command);
        messageProducer.publish(command, wrapper);
    });
}
2020-06-13 09:45:09.404  INFO 21580 --- [pool-1-thread-1] c.c.y.d.consumer.MessageConsumer  : 消费消息 =============== hello
2020-06-13 09:45:09.404  INFO 21580 --- [pool-1-thread-7] c.c.y.d.consumer.MessageConsumer  : 消费消息 =============== hi
2020-06-13 09:45:09.404  INFO 21580 --- [pool-1-thread-6] c.c.y.d.consumer.MessageConsumer  : 消费消息 =============== hello
2020-06-13 09:45:09.404  INFO 21580 --- [pool-1-thread-3] c.c.y.d.consumer.MessageConsumer  : 消费消息 =============== hi
2020-06-13 09:45:09.404  INFO 21580 --- [pool-1-thread-2] c.c.y.d.consumer.MessageConsumer  : 消费消息 =============== hello
2020-06-13 09:45:09.404  INFO 21580 --- [pool-1-thread-4] c.c.y.d.consumer.MessageConsumer  : 消费消息 =============== hi
2020-06-13 09:45:09.404  INFO 21580 --- [pool-1-thread-5] c.c.y.d.consumer.MessageConsumer  : 消费消息 =============== hello

一些方案

规避数据覆盖

使用 Disruptor,首先需要构建一个 RingBuffer,并指定一个大小,注意如果 RingBuffer 里面数据超过了这个大小则会覆盖旧数据。这可能是一个风险,但 Disruptor 提供了检查 RingBuffer 是否写满的机制用于规避这个问题。

// if capacity less than 10%, don't use ringbuffer anymore
if(ringBuffer.remainingCapacity() < RING_SIZE * 0.1) {
    log.warn("disruptor:ringbuffer avaliable capacity is less than 10 %");
    return;
}
// Publishers claim events in sequence
long sequence = ringBuffer.next();
try {
    TranslatorDataWrapper wrapper = ringBuffer.get(sequence);
    wrapper.setCommand(command);
    wrapper.setTarget(object);
} finally {
    ringBuffer.publish(sequence);
}

Bless Bless!

参考文章:

LMAX Disruptor
高性能队列 Disruptor 的使用
蚂蚁金服分布式链路跟踪组件 SOFATracer 中 Disruptor 实践