Disruptor 是英国外汇交易公司 LMAX 开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题,因其出色的性能表现获得 2011 Duke’s 程序框架创新奖。
A High Performance Inter-Thread Messaging Library 项目地址:LMAX Disruptor
介绍
从数据结构上来看,Disruptor 是一个支持生产者/消费者模式的环形队列 。能够在无锁的条件下进行并行消费,也可以根据消费者之间的依赖关系进行先后消费次序。
Disruptor 高效原理:
Disruptor 使用了一个 RingBuffer 替代队列,用生产者消费者指针替代锁。
生产者消费者指针使用 CPU 支持的整数自增,无需加锁并且速度很快。Java 的实现在 Unsafe package 中。
消费者的等待策略
通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的 CPU 的场景下使用
PhasedBackoffWaitStrategy
TimeoutBlockingWaitStrategy
性能和 CPU 资源之间有很好的折中。延迟比较均匀
通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的 CPU 的场景下使用
PhasedBackoffWaitStrategy
TimeoutBlockingWaitStrategy
性能和 CPU 资源之间有很好的折中。延迟比较均匀
食用方式
引入依赖
复制 < dependency >
< groupId >com.lmax</ groupId >
< artifactId >disruptor</ artifactId >
< version >3.4.2</ version >
</ dependency >
命令字和数据包
复制 /**
* @author SHIYU
* @description 无锁队列命令字
* @since 2020-06-13
*/
public interface IDisruptorCommand {
/**
* 测试消息 hello
*/
int CHECK_MSG_HELLO = 1 ;
/**
* 测试消息 hi
*/
int CHECK_MSG_HI = 2 ;
}
复制 /**
* @author SHIYU
* @description 传输的数据
* @since 2020-06-13
*/
@ Data
@ NoArgsConstructor
@ AllArgsConstructor
public class TranslatorDataWrapper {
private int command;
private Object target;
}
轮询策略
复制 /**
* @author SHIYU
* @description 轮询策略
* @since 2020-06-13
*/
@ Configuration
public class DisruptorWaitStrategyConfiguration {
@ Bean
@ ConditionalOnMissingBean ( WaitStrategy . class )
public WaitStrategy getWaitStrategy () {
// 如果 CPU 比较叼的话,可以用 YieldingWaitStrategy
return new BlockingWaitStrategy() ;
}
}
生成者和消费者
复制 /**
* @author SHIYU
* @description 消息生产者
* @since 2020-06-13
*/
@ Data
@ Slf4j
@ AllArgsConstructor
public class MessageProducer {
private RingBuffer < TranslatorDataWrapper > ringBuffer;
/**
* 发布事件
*
* @param command 命令字
* @param object 数据
*/
public void publish ( int command , Object object) {
long sequence = ringBuffer . next ();
try {
TranslatorDataWrapper wrapper = ringBuffer . get (sequence);
wrapper . setCommand (command);
wrapper . setTarget (object);
} finally {
ringBuffer . publish (sequence);
}
}
}
复制 /**
* 消息消费者
*
* @author nk
*/
@ Slf4j
public class MessageConsumer implements WorkHandler < TranslatorDataWrapper > {
@ Override
public void onEvent ( TranslatorDataWrapper wrapper) {
int command = wrapper . getCommand ();
switch (command) {
case IDisruptorCommand . CHECK_MSG_HELLO :
log . info ( "消费消息 =============== hello" );
break ;
case IDisruptorCommand . CHECK_MSG_HI :
log . info ( "消费消息 =============== hi" );
break ;
default:
break ;
}
}
}
构造工厂
disruptor.buffer.size
这里设置为 1024 * 1024 即 1048576。
复制 disruptor :
buffer :
size : 1048576
复制 /**
* @author SHIYU
* @description 环型无锁队列
* @since 2020-06-13
*/
@ Slf4j
@ Component
@ RequiredArgsConstructor (onConstructor = @ __ (@ Autowired ))
public class RingBufferWorkerPoolFactory {
@ Value ( "${disruptor.buffer.size}" )
private int mBufferSize;
private final WaitStrategy mWaitStrategy;
private Map < Integer , MessageProducer > producers = new ConcurrentHashMap <>();
private RingBuffer < TranslatorDataWrapper > ringBuffer;
public void initAndStart ( MessageConsumer [] messageConsumers) {
// 1.构建 ringBuffer 对象
this . ringBuffer = RingBuffer . create ( ProducerType . MULTI ,
TranslatorDataWrapper ::new ,
mBufferSize ,
mWaitStrategy);
// 2.通过 ringBuffer 创建一个屏障
SequenceBarrier sequenceBarrier = this . ringBuffer . newBarrier ();
// 3.创建多个消费者数组
WorkerPool < TranslatorDataWrapper > workerPool = new WorkerPool <>(
this . ringBuffer ,
sequenceBarrier ,
new EventExceptionHandler() ,
messageConsumers);
// 4.设置多个消费者的 sequence 序号,用于单独统计消费进度,并且设置到 ringBuffer 中
this . ringBuffer . addGatingSequences ( workerPool . getWorkerSequences ());
// 5.启动工作池
int processorsCount = Runtime . getRuntime () . availableProcessors ();
log . info ( "进程数 -> {}" , processorsCount);
workerPool . start ( Executors . newFixedThreadPool (processorsCount));
}
public MessageProducer getMessageProducer ( int command) {
MessageProducer messageProducer = producers . get (command);
if (messageProducer == null ) {
messageProducer = new MessageProducer( this . ringBuffer ) ;
producers . put (command , messageProducer);
}
return messageProducer;
}
/**
* 异常静态类
*/
@ Slf4j
static class EventExceptionHandler implements ExceptionHandler < TranslatorDataWrapper > {
@ Override
public void handleEventException ( Throwable ex , long sequence , TranslatorDataWrapper event) {
log . error ( "handleEventException -> ex:{} sequence:{} event:{}" , ex . getMessage () , sequence , event . getClass () . toString ());
ex . printStackTrace ();
}
@ Override
public void handleOnStartException ( Throwable ex) {
log . error ( "handleOnStartException -> ex:{}" , ex . getMessage ());
ex . printStackTrace ();
}
@ Override
public void handleOnShutdownException ( Throwable ex) {
log . error ( "handleOnShutdownException -> ex:{} " , ex . getMessage ());
ex . printStackTrace ();
}
}
}
复制 public Disruptor(
final EventFactory< T > eventFactory ,
final int ringBufferSize ,
final ThreadFactory threadFactory ,
final ProducerType producerType ,
final WaitStrategy waitStrategy)
{
this (
RingBuffer . create (producerType , eventFactory , ringBufferSize , waitStrategy) ,
new BasicExecutor(threadFactory) );
}
eventFactory
:在环形缓冲区中创建事件的 factory;
ringBufferSize
:环形缓冲区的大小,必须是 2 的幂;
threadFactory
:用于为处理器创建线程;
producerType
:生成器类型以支持使用正确的 sequencer
和 publisher
创建 RingBuffer
;枚举类型,SINGLE
、MULTI
两个项。对应于 SingleProducerSequencer
和 MultiProducerSequencer
两种 Sequencer
;
启动
复制 public static void main( String [] args) {
SpringApplication . run ( YukoApplication . class , args);
// 启动 disruptor
MessageConsumer [] consumers = new MessageConsumer [ 8 ];
for ( int i = 0 ; i < consumers . length ; i ++ ) {
MessageConsumer messageConsumer = new MessageConsumer() ;
consumers[i] = messageConsumer;
}
RingBufferWorkerPoolFactory factory = SpringUtil . getBean ( RingBufferWorkerPoolFactory . class );
factory . initAndStart (consumers);
}
测试消息生产消费
复制 private RingBufferWorkerPoolFactory getWorkerPoolFactory() {
return SpringUtil . getBean ( RingBufferWorkerPoolFactory . class );
}
@ Scheduled (fixedDelay = 1000 , initialDelay = 3000 )
private void msg() {
IntStream . range ( 1 , 9 ) . forEach (i -> {
int command = i % 2 == 0 ? IDisruptorCommand . CHECK_MSG_HELLO : IDisruptorCommand . CHECK_MSG_HI ;
TranslatorDataWrapper wrapper = new TranslatorDataWrapper(command , "WORLD" ) ;
MessageProducer messageProducer = getWorkerPoolFactory() . getMessageProducer (command);
messageProducer . publish (command , wrapper);
});
}
复制 2020-06-13 09:45:09.404 INFO 21580 --- [pool-1-thread-1] c.c.y.d.consumer.MessageConsumer : 消费消息 =============== hello
2020-06-13 09:45:09.404 INFO 21580 --- [pool-1-thread-7] c.c.y.d.consumer.MessageConsumer : 消费消息 =============== hi
2020-06-13 09:45:09.404 INFO 21580 --- [pool-1-thread-6] c.c.y.d.consumer.MessageConsumer : 消费消息 =============== hello
2020-06-13 09:45:09.404 INFO 21580 --- [pool-1-thread-3] c.c.y.d.consumer.MessageConsumer : 消费消息 =============== hi
2020-06-13 09:45:09.404 INFO 21580 --- [pool-1-thread-2] c.c.y.d.consumer.MessageConsumer : 消费消息 =============== hello
2020-06-13 09:45:09.404 INFO 21580 --- [pool-1-thread-4] c.c.y.d.consumer.MessageConsumer : 消费消息 =============== hi
2020-06-13 09:45:09.404 INFO 21580 --- [pool-1-thread-5] c.c.y.d.consumer.MessageConsumer : 消费消息 =============== hello
一些方案
规避数据覆盖
使用 Disruptor,首先需要构建一个 RingBuffer,并指定一个大小,注意如果 RingBuffer 里面数据超过了这个大小则会覆盖旧数据。这可能是一个风险,但 Disruptor 提供了检查 RingBuffer 是否写满的机制用于规避这个问题。
复制 // if capacity less than 10%, don't use ringbuffer anymore
if ( ringBuffer . remainingCapacity () < RING_SIZE * 0.1 ) {
log . warn ( "disruptor:ringbuffer avaliable capacity is less than 10 %" );
return ;
}
// Publishers claim events in sequence
long sequence = ringBuffer . next ();
try {
TranslatorDataWrapper wrapper = ringBuffer . get (sequence);
wrapper . setCommand (command);
wrapper . setTarget (object);
} finally {
ringBuffer . publish (sequence);
}
Bless Bless!
参考文章:
高性能队列 Disruptor 的使用
蚂蚁金服分布式链路跟踪组件 SOFATracer 中 Disruptor 实践