青年IT男

个人从事金融行业,就职过易极付、思建科技、某网约车平台等重庆一流技术团队,目前就职于某银行负责统一支付系统建设。自身对金融行业有强烈的爱好。同时也实践大数据、数据存储、自动化集成和部署、分布式微服务、响应式编程、人工智能等领域。同时也热衷于技术分享创立公众号和博客站点对知识体系进行分享。微信公众号:青年IT男。

Disruptor系列之使用篇

Disruptor系列之使用篇

前言

现在正是抗击新型冠状病毒关键时期,为了响应国家号召在家休息减少外出这是作为我们普通老百姓对国家最大的贡献,同时必要外出时必须要戴上口罩,回家后需要使用消毒剂或肥皂洗手。相信国家和一线的工作者还有我们广大的人民群众我们一定能够战胜。有关最新疫情实时数据可以查看官方数据以下链接直达!

疫情实时大数据报告

新型肺炎确诊患者相同行程查询工具

Disruptor分析主要围绕两大系列展开,分别是使用篇系列和源码篇分析系列,因为Disruptor涉及到非常多的一些概念我会在源码分析篇给以更深入的讲解,这里是作为Disruptor的入门篇来讲解和演示怎样来用Disruptor。我在看源码的时候遵循一个原则:就是首先得把代码demo跑起来知道是怎么回事、知道怎么入门然后才能去更深入的探索。

什么是Disruptor

首先大家在工作场景中一定用到不少设计模式,这里有个非常常用的模式就是生产者消费者模式,在很多的中间件比如:Kafka、Rabbit等,而且在我们应用程序里面也有很多这样的场景比如:在通用的业务场景中经常有许多的业务日志需要持久化,自持久化之前可能我们需要对日志进行一些清洗、格式化等操作,然后在交给我们的消息中间件在对接到我们的elk中,那么这里就需要一个中间转换如图:

Disruptor_Logging

在这个预处理中我们的设计要求是异步并行快速处理,那我我们可能想到的实现方案很多比如:

  • 使用队列来作为数据的中转站通过多线程来消费
  • 可能是通过先把日志作为消息发送到MQ中间件,然后在消费
  • 这里我们也可以使用Disruptor来作类似第一种方案

这里先不讨论各个方案那种最优,这需要结合各种业务场景去设计和考虑以达到最优效果。比如这里如果日志在内存处理如果发生宕机怎么处理,日志是不是一定不能丢失?如果先发往MQ是不是会存在至少多了2次的MQ交互等因素去综合考虑。

Disruptor_Logging1

那么到底什么是Disruptor可以简单的总结如下:首先它提供给我们一种类似生产者消费者的使用模式,让我们使用者只关注数据的生产和数据的消费,它是基于内存数组的数据结构实现能够快速的检索数据,同时支持多线程生产和多线程消费数据在应用层面几乎不用考虑线程安全问题,同时通过Disruptor的一些优化手动极大的提高了框架本身的效率。

Disruptor的优势

科普:我们知道所有的数据结构无非在底层都是采用两种数据结构分别是,数组:内存数据连续分配、具有快速检索查找时间复杂度为O(1),链表:内存数据分布在不连续的位置、数据检索时间复杂度为O(n)

  • 底层采用数组,检索数据时间复杂度为O(1)
  • 采用缓存行填充,解决伪共享问题
  • 对数据竞争使用CAS操作,解决多线程生产和消费Sequence的竞争
  • 预初始化数组中数据,减少逻辑判断
  • 采用覆盖数据的方式,减少GC
  • 针对不同的需求场景设计了8中WaitStrategy策略

Disruptor使用例子

Pipeliner管道模式

Pipelier_Disruptor

package com.lmax.disruptor.example;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.util.concurrent.TimeUnit;

/***
 *@className Pipeliner
 *
 *@description 多线程并发处理数据,然后汇总处理,参考我画的图
 *<a href="http://youngitman.tech/wp-content/uploads/2020/02/Pipelier_Disruptor.png">处理流程图</a>
 *
 *@author <a href="http://youngitman.tech">青年IT男</a>
 *
 *@date 22:43 2020-02-04
 *
 *@JunitTest: {@link  }
 *
 *@version v1.0.0
 *
 **/
public class Pipeliner {
    public static void main(String[] args) throws InterruptedException {
        //新建一个Disruptor实例,单线程生产数据,WaitStrategy采用BlockingWaitStrategy
         Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(factory, 4,DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());

        //这用使用示例多线程并发消费
        disruptor.handleEventsWith(
                //事件处理器
                new ParallelHandler(0, 3),
                new ParallelHandler(1, 3),
                new ParallelHandler(2, 3)
        ).then(new JoiningHandler());

        //启动
        RingBuffer<PipelinerEvent> ringBuffer = disruptor.start();

        for (int i = 0; i < 20; i++) {
            //获取下一个可用的发布数据位置
            long next = ringBuffer.next();
            try {
                //获取当前位置的事件,由于Disruptor提前初始化数组中的数据所有不需要判断
                PipelinerEvent pipelinerEvent = ringBuffer.get(next);
                //根据自己业务调整事件携带的数据
                pipelinerEvent.input = i;
            } finally {
                //发布事件
                ringBuffer.publish(next);
            }
        }

        TimeUnit.MILLISECONDS.sleep(30_0000);
    }

    /***
     *@className Pipeliner
     *
     *@description 多线程并行处理,根据线程数量分割数据到不同线程去处理
     *
     *@author <a href="http://youngitman.tech">青年IT男</a>
     *
     *@date 22:43 2020-02-04
     *
     *@JunitTest: {@link  }
     *
     *@version v1.0.0
     *
     **/
    private static class ParallelHandler implements EventHandler<PipelinerEvent> {
        private final int ordinal;
        private final int totalHandlers;

        ParallelHandler(int ordinal, int totalHandlers) {
            this.ordinal = ordinal;
            this.totalHandlers = totalHandlers;
        }

        @Override
        public void onEvent(PipelinerEvent event, long sequence, boolean endOfBatch) throws Exception {
            //过滤不需要处理的数据
            if (sequence % totalHandlers == ordinal) {
                event.result = "The thread name is " + Thread.currentThread().getName() + " and The Data is " + event.input;
            } else {
                //数据将被JoiningHandler过滤掉
            }
        }
    }

    private static class JoiningHandler implements EventHandler<PipelinerEvent> {
        private long lastEvent = -1;

        @Override
        public void onEvent(PipelinerEvent event, long sequence, boolean endOfBatch) throws Exception {
            //
            if (event.input != lastEvent + 1 || event.result == null) {
                System.out.println("Error: " + event);
            }
            System.out.println("The PipelinerEvent data is " + event + " for being handed");
            lastEvent = event.input;
            event.result = null;

        }
    }

    private static class PipelinerEvent {
        long input;
        Object result;

        private static final EventFactory<PipelinerEvent> FACTORY = new EventFactory<PipelinerEvent>() {
            @Override
            public PipelinerEvent newInstance() {
                return new PipelinerEvent();
            }
        };

        @Override
        public String toString() {
            return "PipelinerEvent{" +
                    "input=" + input +
                    ", result=" + result +
                    '}';
        }
    }
}

Serialize串行模式

Distruptor_Serialize01

package com.lmax.disruptor.example;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.util.concurrent.TimeUnit;

/**
 * @author <a href="http://youngitman.tech">青年IT男</a>
 * @version v1.0.0
 * @className SingleProducerWithSerializeConsumer
 * @description
 * @date 2020-02-06 14:56
 * @JunitTest: {@link  }
 **/
public class SingleProducerWithSerializeConsumer {
    private static class MyEvent {
        private Object a;
        private Object b;
        private Object c;
        private Object d;

        @Override
        public String toString() {
            return a + ":" + b + ":" + c + ":" + d;
        }
    }

    private static EventFactory<MyEvent> factory = () -> new SingleProducerWithSerializeConsumer.MyEvent();

    private static EventHandler<SingleProducerWithSerializeConsumer.MyEvent> handler1 = (event, sequence, endOfBatch) -> {
        event.b = event.a;
        System.out.println("The " + event + " Data is handled for the  handler1,and Thread numer is " + Thread.currentThread().getName());
    };

    private static EventHandler<SingleProducerWithSerializeConsumer.MyEvent> handler2 = (event, sequence, endOfBatch) -> {
        event.c = event.b;
        System.out.println("The " + event + " Data is handled for the  handler2,and Thread numer is " + Thread.currentThread().getName());
    };

    private static EventHandler<SingleProducerWithSerializeConsumer.MyEvent> handler3 = (event, sequence, endOfBatch) -> {
        event.d = event.c;
        System.out.println("The " + event + " Data is handled for the  handler3,and Thread numer is " + Thread.currentThread().getName());
    };

    public static void main(String[] args) throws InterruptedException {

        serialize01();

        serialize02();

        //防止JVM退出
        TimeUnit.MILLISECONDS.sleep(1000_000);
    }

    private static void publish(Disruptor disruptor) {
        //启动
        RingBuffer<MyEvent> ringBuffer = disruptor.start();

        for (int i = 0; i < 20; i++) {
            //发布数据
            long sequence = ringBuffer.next();

            SingleProducerWithSerializeConsumer.MyEvent event = ringBuffer.get(sequence);
            event.a = 10;

            ringBuffer.publish(sequence);
        }
    }

    /***
     *
     * 单线程生产多个消费者线程串行处理
     *
     * @author liyong
     * @date 15:06 2020-02-06
     * @param
     * @exception
     * @return void
     **/
    private static void serialize01() {
        //新建一个Disruptor实例,单线程生产数据,WaitStrategy采用BlockingWaitStrategy
        Disruptor<SingleProducerWithSerializeConsumer.MyEvent> disruptor = new Disruptor<>(factory, 4, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());

        disruptor.handleEventsWith(handler1).then(handler2).then(handler3);

        publish(disruptor);
    }

    /***
     *
     * 多线程生产多个消费者线程串行处理
     *
     * @author liyong
     * @date 15:06 2020-02-06
     * @param
     * @exception
     * @return void
     **/
    private static void serialize02() {
        //新建一个Disruptor实例,单线程生产数据,WaitStrategy采用BlockingWaitStrategy
        Disruptor<SingleProducerWithSerializeConsumer.MyEvent> disruptor = new Disruptor<>(factory, 4, DaemonThreadFactory.INSTANCE, ProducerType.MULTI, new BlockingWaitStrategy());

        disruptor.handleEventsWith(handler1).then(handler2).then(handler3);

        new Thread(()->{
            publish(disruptor);
        }).start();
    }
}

多线程生产和消费

Disruptor_MultThread

package com.lmax.disruptor.example;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.util.concurrent.TimeUnit;

/**
 *
 * 多线程生产数据和多线程消费数据
 *
 * @author <a href="http://youngitman.tech">青年IT男</a>
 * @version v1.0.0
 * @className MultProducerWithMultConsumer
 * @description
 * @date 2020-02-06 14:56
 * @JunitTest: {@link  }
 **/
public class MultProducerWithMultConsumer {
    private static class MyEvent {
        private Object a;
        private Object b;
        private Object c;
        private Object d;

        @Override
        public String toString() {
            return a + ":" + b + ":" + c + ":" + d;
        }
    }

    private static EventFactory<MyEvent> factory = () -> new MultProducerWithMultConsumer.MyEvent();

    private static EventHandler<MultProducerWithMultConsumer.MyEvent> handler1 = (event, sequence, endOfBatch) -> {
        event.b = event.a;
        System.out.println("The " + event + " Data is handled for the  handler1,and Thread numer is " + Thread.currentThread().getName());
    };

    private static EventHandler<MultProducerWithMultConsumer.MyEvent> handler2 = (event, sequence, endOfBatch) -> {
        event.c = event.b;
        System.out.println("The " + event + " Data is handled for the  handler2,and Thread numer is " + Thread.currentThread().getName());
    };

    private static EventHandler<MultProducerWithMultConsumer.MyEvent> handler3 = (event, sequence, endOfBatch) -> {
        event.d = event.c;
        System.out.println("The " + event + " Data is handled for the  handler3,and Thread numer is " + Thread.currentThread().getName());
    };

    public static void main(String[] args) throws InterruptedException {

        serialize02();

        //防止JVM退出
        TimeUnit.MILLISECONDS.sleep(1000_000);
    }

    private static void publish(Disruptor disruptor) {
        //启动
        RingBuffer<MyEvent> ringBuffer = disruptor.start();

        for (int i = 0; i < 20; i++) {
            //发布数据
            long sequence = ringBuffer.next();

            MultProducerWithMultConsumer.MyEvent event = ringBuffer.get(sequence);
            event.a = 10;

            ringBuffer.publish(sequence);
        }
    }

    /***
     *
     * 多线程生产和多个线程消费数据
     *
     * @author liyong
     * @date 15:06 2020-02-06
     * @param
     * @exception
     * @return void
     **/
    private static void serialize02() {
        //新建一个Disruptor实例,单线程生产数据,WaitStrategy采用BlockingWaitStrategy
        Disruptor<MultProducerWithMultConsumer.MyEvent> disruptor = new Disruptor<>(factory, 4, DaemonThreadFactory.INSTANCE, ProducerType.MULTI, new BlockingWaitStrategy());

        disruptor.handleEventsWith(handler1,handler2,handler3);

        new Thread(()->{
            publish(disruptor);
        }).start();
    }
}

缓存消费模式

数据从RingBuffer拉取后缓存到本地,这用生产者可以继续生产数据,消费者消费数据首先从缓存中消费数据,这用针对一些消费者消费慢、生产者生产数据较快场景下比较适合。

Disruptor_Cache

package com.lmax.disruptor.example;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventPoller;
import com.lmax.disruptor.RingBuffer;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 提供一个类似多级缓存模式,先从本地缓存获取,获取不到才从RingBuffer获取数据,每次或取所有有效数据并保存到本地缓存
 * 然后获取数据是从本地获取
 * Alternative usage of EventPoller, here we wrap it around BatchedEventPoller
 * to achieve Disruptor's batching. this speeds up the polling feature
 */
public class PullWithBatchedPoller {
    public static void main(String[] args) throws Exception {
        int batchSize = 40;
        RingBuffer<BatchedPoller.DataEvent<Object>> ringBuffer = RingBuffer.createMultiProducer(BatchedPoller.DataEvent.factory(), 1024);

        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                //每个线程都需要自己维护一个poller
                BatchedPoller<Object> poller = new BatchedPoller<Object>(ringBuffer, batchSize);

                while (true){

                    Object value = null;
                    try {
                        value = poller.poll();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                    // Value could be null if no events are available.
                    if (null != value) {
                        // Process value.
                        System.out.println("The thread number is " + Thread.currentThread().getName() + " for consumer data " + value);
                    }else {
                        try {
                            TimeUnit.MILLISECONDS.sleep(1_000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }

            }).start();
        }

        TimeUnit.MILLISECONDS.sleep(2_00);

        for (int i = 0; i < 20; i++) {
            //发布数据
            long sequence = ringBuffer.next();

            BatchedPoller.DataEvent event = ringBuffer.get(sequence);
            event.data = i;

            ringBuffer.publish(sequence);
        }

        TimeUnit.MILLISECONDS.sleep(50_000);

    }
}

class BatchedPoller<T> {

    private final EventPoller<BatchedPoller.DataEvent<T>> poller;
    private final int maxBatchSize;
    private final BatchedData<T> polledData;

    BatchedPoller(RingBuffer<BatchedPoller.DataEvent<T>> ringBuffer, int batchSize) {
        this.poller = ringBuffer.newPoller();
        //设置消费者Sequence
        ringBuffer.addGatingSequences(poller.getSequence());

        if (batchSize < 1) {
            batchSize = 20;
        }
        this.maxBatchSize = batchSize;
        this.polledData = new BatchedData<T>(this.maxBatchSize);
    }

    public T poll() throws Exception {
        //判断本地是否有数据
        if (polledData.getMsgCount() > 0) {
            return polledData.pollMessage(); // we just fetch from our local
        }
        //从RingBuffer获取数据,每次获取所有有效数据
        loadNextValues(poller, polledData); // we try to load from the ring
        //本地存在数据直接拉取本地数据
        return polledData.getMsgCount() > 0 ? polledData.pollMessage() : null;
    }

    private EventPoller.PollState loadNextValues(EventPoller<BatchedPoller.DataEvent<T>> poller, final BatchedData<T> batch)
            throws Exception {
        //开始从RingBuffer获取数据
        return poller.poll((event, sequence, endOfBatch) -> {
            T item = event.copyOfData();
            //添加到本地缓存
            return item != null ? batch.addDataItem(item) : false;
        });
    }

    public static class DataEvent<T> {

        T data;

        public static <T> EventFactory<BatchedPoller.DataEvent<T>> factory() {
            return () -> new DataEvent<T>();
        }

        public T copyOfData() {
            // Copy the data out here. In this case we have a single reference
            // object, so the pass by
            // reference is sufficient. But if we were reusing a byte array,
            // then we
            // would need to copy
            // the actual contents.
            return data;
        }

        void set(T d) {
            data = d;
        }

    }

    /***
     *@className BatchedPoller
     *
     *@description 封装本地数据相关操作
     *
     *@author <a href="http://youngitman.tech">青年IT男</a>
     *
     *@date 18:12 2020-02-06
     *
     *@JunitTest: {@link  }
     *
     *@version v1.0.0
     *
    **/
    private static class BatchedData<T> {

        private int msgHighBound;
        private final int capacity;
        private final T[] data;
        private int cursor;

        @SuppressWarnings("unchecked")
        BatchedData(int size) {
            this.capacity = size;
            data = (T[]) new Object[this.capacity];
        }

        private void clearCount() {
            msgHighBound = 0;
            cursor = 0;
        }

        public int getMsgCount() {
            return msgHighBound - cursor;
        }

        /***
         *
         * 添加到本地缓存中
         *
         * @author liyong
         * @date 23:29 2020-02-04
         * @param item
         * @exception
         * @return boolean
         **/
        public boolean addDataItem(T item) throws IndexOutOfBoundsException {
            if (msgHighBound >= capacity) {
                throw new IndexOutOfBoundsException("Attempting to add item to full batch");
            }

            data[msgHighBound++] = item;
            return msgHighBound < capacity;
        }

        /***
         *
         * 获取本地数据
         *
         * @author liyong
         * @date 23:28 2020-02-04
         * @param
         * @exception
         * @return T
         **/
        public T pollMessage() {
            T rtVal = null;
            if (cursor < msgHighBound) {
                rtVal = data[cursor++];
            }
            //重置两个指针位置
            if (cursor > 0 && cursor >= msgHighBound) {
                clearCount();
            }
            return rtVal;
        }
    }
}

源码

所有代码Fork官网Git后我调整相关demo和对源码进行注释已提交至

https://github.com/liyong1028826685/disruptor.git 仓库

关于作者

我的博客地址
微信公众号

在这里插入图片描述

0
青年IT男

个人从事金融行业,就职过易极付、思建科技等重庆一流技术团队,目前就职于某网约车平台负责整个支付系统建设。自身对金融行业有强烈的爱好。同时也实践大数据、数据存储、自动化集成和部署、分布式微服务、响应式编程、人工智能等领域。

评论已关闭。

This site is protected by wp-copyrightpro.com