Message Queue(MQ,消息队列中间件)作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下优势:
削峰填谷:主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题
系统解耦:解决不同重要程度、不同能力级别系统之间依赖导致一死全死
提升性能:当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统
蓄流压测:线上有些链路不好压测,可以通过堆积一定量消息再放开来压测
消息中间件中有两个角色:消息生产者和消息消费者。RocketMQ 里同样有这两个概念,消息生产者负责创建消息并发送到 RocketMQ 服务器,RocketMQ 服务器会将消息持久化到磁盘,消息消费者从 RocketMQ 服务器拉取消息并提交给应用消费。
RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点:
支持严格的消息顺序
支持 Topic 与 Queue 两种模式
亿级消息堆积能力
比较友好的分布式特性
同时支持 Push 与 Pull 方式消费消息
RocketMQ 优势:
支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持)
支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提)
支持 18 个级别的延迟消息(RabbitMQ 和 Kafka 不支持)
支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认)
支持 Consumer 端 Tag 过滤,减少不必要的网络传输(RabbitMQ 和 Kafka 不支持)
支持重复消费(RabbitMQ 不支持,Kafka 支持)
RocketMQ 有 namesrv 和 broker 组成。
namesrv 的功能如下:
接收 broker 的请求注册 broker 路由信息(master 和 slave)
接收 client 的请求根据某个 topic 获取所有到 broker 的路由信息
broker 则是 RocketMQ 真正存储消息的地方,broker 消息存储主要包括 3 个部分,分别 commitLog 的存储、consumeQueue 的存储、index 的存储。
Producer 和 Consumer 都是通过 namesrv 获取 broker 路由信息,连接到 broker 生产消费消息,namesrv 和 broker 可以分别集群部署,生产者消费者同样可以分别集群部署,物理部署架构图如下:
version: '3.5'services:rmqnamesrv:image: foxiswho/rocketmq:servercontainer_name: rmqnamesrvports:- 9876:9876volumes:- ./data/logs:/opt/logs- ./data/store:/opt/storenetworks:rmq:aliases:- rmqnamesrvrmqbroker:image: foxiswho/rocketmq:brokercontainer_name: rmqbrokerports:- 10909:10909- 10911:10911volumes:- ./data/logs:/opt/logs- ./data/store:/opt/store- ./data/brokerconf/broker.conf:/etc/rocketmq/broker.confenvironment:NAMESRV_ADDR: 'rmqnamesrv:9876'JAVA_OPTS: ' -Duser.home=/opt'JAVA_OPT_EXT: '-server -Xms128m -Xmx128m -Xmn128m'command: mqbroker -c /etc/rocketmq/broker.confdepends_on:- rmqnamesrvnetworks:rmq:aliases:- rmqbrokerrmqconsole:image: styletang/rocketmq-console-ngcontainer_name: rmqconsoleports:- 8080:8080environment:JAVA_OPTS: '-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false'depends_on:- rmqnamesrvnetworks:rmq:aliases:- rmqconsolenetworks:rmq:name: rmqdriver: bridge
需要注意 rocketmq-broker 与 rokcetmq-console 都需要与 rokcetmq-nameserver 连接,需要知道 nameserver ip。使用 docker-compose 之后,上面三个 docker 容器将会一起编排,可以直接使用容器名代替容器 ip,如这里 nameserver 容器名 rmqnamesrv。
RocketMQ Broker 需要一个配置文件,按照上面的 Compose 配置,在 ./data/brokerconf/
目录下创建一个名为 broker.conf
的配置文件,内容如下:
censed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.# 所属集群名字brokerClusterName=DefaultCluster# broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,# 在 broker-b.properties 使用: broker-bbrokerName=broker-a# 0 表示 Master,> 0 表示 SlavebrokerId=0# nameServer地址,分号分割# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876# 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed# 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IPbrokerIP1=192.168.205.10# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数defaultTopicQueueNums=4# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,falseautoCreateTopicEnable=true# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭autoCreateSubscriptionGroup=true# Broker 对外服务的监听端口listenPort=10911# 删除文件时间点,默认凌晨4点deleteWhen=04# 文件保留时间,默认48小时fileReservedTime=120# commitLog 每个文件的大小默认1GmapedFileSizeCommitLog=1073741824# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整mapedFileSizeConsumeQueue=300000# destroyMapedFileIntervalForcibly=120000# redeleteHangedFileInterval=120000# 检测物理文件磁盘空间diskMaxUsedSpaceRatio=88# 存储路径# storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store# commitLog 存储路径# storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog# 消费队列存储# storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue# 消息索引存储路径# storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index# checkpoint 文件存储路径# storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint# abort 文件存储路径# abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort# 限制的消息大小maxMessageSize=65536# flushCommitLogLeastPages=4# flushConsumeQueueLeastPages=2# flushCommitLogThoroughInterval=10000# flushConsumeQueueThoroughInterval=60000# Broker 的角色# - ASYNC_MASTER 异步复制Master# - SYNC_MASTER 同步双写Master# - SLAVEbrokerRole=ASYNC_MASTER# 刷盘方式# - ASYNC_FLUSH 异步刷盘# - SYNC_FLUSH 同步刷盘flushDiskType=ASYNC_FLUSH# 发消息线程池数量# sendMessageThreadPoolNums=128# 拉消息线程池数量# pullMessageThreadPoolNums=128
精简配置文件:
brokerClusterName = DefaultClusterbrokerName = broker-abrokerId = 0deleteWhen = 04fileReservedTime = 48brokerRole = ASYNC_MASTERflushDiskType = ASYNC_FLUSHbrokerIP1=192.168.205.10
配置完成,启动:
docker-compose up -d
访问 http://rmqIP:8080
进入控制台。
Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration
与 Broker 进行连接。
Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念(Kafka、RabbitMQ、RocketMQ 都是消息中间件)。
Spring Cloud Stream 内部有两个概念:Binder
和 Binding
。
Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。
比如 Kafka 的实现 KafkaMessageChannelBinder,RabbitMQ 的实现 RabbitMessageChannelBinder 以及 RocketMQ 的实现 RocketMQMessageChannelBinder。
Binding: 包括 Input Binding 和 Output Binding。
Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。
这里只展示最简单 Demo,更多栗子可以参考 spring cloud alibaba rocketmq
首先,修改 pom.xml 文件,引入 RocketMQ Stream Starter。
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId></dependency>
配置 Input 和 Output 的 Binding 信息并配合 @EnableBinding
注解使其生效
spring:cloud:stream:# 配置 rocketmq 的 nameserver 地址rocketmq:binder:name-server: 192.168.205.4:9876# 配置 input 和 output bindingbindings:input:content-type: text/plaindestination: test-topicgroup: test-groupoutput:content-type: text/plaindestination: test-topic
@SpringBootApplication@EnableBinding({ Source.class, Sink.class })public class AdminApplication {public static void main(String[] args) {SpringApplication.run(RocketMQApplication.class, args);}}
消息发送服务 SenderService:
@Servicepublic class SenderService {@Autowiredprivate MessageChannel output;public void send(String msg) throws Exception {output.send(MessageBuilder.withPayload(msg).build());}public <T> void sendWithTags(T msg, String tag) throws Exception {Message message = MessageBuilder.createMessage(msg,new MessageHeaders(Stream.of(tag).collect(Collectors.toMap(str -> MessageConst.PROPERTY_TAGS, String::toString))));output.send(message);}public <T> void sendObject(T msg, String tag) throws Exception {Message message = MessageBuilder.withPayload(msg).setHeader(MessageConst.PROPERTY_TAGS, tag).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build();output.send(message);}public <T> void sendTransactionalMsg(T msg, int num) throws Exception {MessageBuilder builder = MessageBuilder.withPayload(msg).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);builder.setHeader("test", String.valueOf(num));builder.setHeader(RocketMQHeaders.TAGS, "binder");Message message = builder.build();output.send(message);}}
发送消息:
@Beanpublic CustomRunner customRunner() {return new CustomRunner("output");}public static class CustomRunner implements CommandLineRunner {private final String bindingName;public CustomRunner(String bindingName) {this.bindingName = bindingName;}@Autowiredprivate SenderService senderService;@Overridepublic void run(String... args) throws Exception {if (this.bindingName.equals("output")) {int count = 5;for (int index = 1; index <= count; index++) {String msgContent = "msg-" + index;if (index % 3 == 0) {senderService.send(msgContent);}else if (index % 3 == 1) {senderService.sendWithTags(msgContent, "tagStr");}else {senderService.sendObject(new Foo(index, "foo"), "tagObj");}}}}}@Datapublic static class Foo {private Integer inx;private String msg;Foo(Integer inx, String msg) {this.inx = inx;this.msg = msg;}}
消息接收服务 ReceiveService:
@Servicepublic class ReceiveService {@StreamListener("input")public void receiveInput1(String receiveMsg) {System.out.println("input receive: " + receiveMsg);}}
参考文章: 基于 Docker 安装 RocketMQ rocketmq 部署启动指南-Docker 版 Spring Alibaba RocketMQ