RocketMQ

更新时间:2023-10-18 10:07:01 阅读量: 综合文库 文档下载

说明:文章内容仅供预览,部分内容可能不全。下载后的文档,内容与下面显示的完全一致。下载之前请确认下面内容是否您想要的,是否完整无缺。

Rocket介绍:

发展历史

大约经历了三个主要版本迭代

一、Metaq 1.x :开源社区killme2008维护,开源社区非常活跃 二、Metaq 2.x :于2012年10月份在淘宝内部上线,并广泛使用

三、RocketMq3.x:阿里内部对其核心功能的简化。并衍生出多个消息服务项目。

运用到阿里的支付、订单、充值等多个业务领域。

MQ对比

关注度 成熟度 社区 社区活跃度 文档 特点 ActiveMQ 高 成熟 Apache 高 多 功能齐全,被大量使用 RabbitMQ 高 成熟 Mozilla开源社区 高 多 由于Erlang语言的并发能力,使得性能很好 RocketMQ 中 比较成熟 Alibaba 中 少 各个环节分布式扩展设计,主从高可用群集;支持上万个队列;多种消费模式;性能很好 开源 Java 开源 语言 开源 Java 开源 Erlang(面向并发编程语言) Client语言 支持协议 支持Java OpenWire、STOMP、REST、XMPP、AMQP 支持Java AMQP 支持Java 自定义的一套,提供了支持JMS客户端的API 持久化 事务 集群和负载均衡 管理页面 部署方式 评价 内存,文件,数据库 支持 支持 内存,文件 不支持 支持 磁盘文件 支持 支持 一般 独立,嵌入 优点:成熟的产品,已经在很多公司得到应用(非大规模场景)。有较多的文档。各种协议支持较好,有多重语言的成熟的客户好 独立 优点:由于erlang语言的特性,mq性能较好;管理界面较丰富,在互联网公司也有较大规模的应用;无 独立 优点:模型简单,接口易用。在阿里大规模应用。目前支付宝中的余额宝等新兴产品均使用rocketmq。集群规模大概在50台左右,单日处理消息上百亿;性能非常好,可以大量堆积消1

端; 会出莫名其妙的问题,并且会丢消息。其重心放到activemq6.0产品—apollo上去了,目前社区不活跃,且对5.x维护较少; Activemq 不适合用于上千个队列的应用场景 支持amqp协议,有多语言且支持 amqp 的客 户端可用。 缺点:erlang语言难度较大。 息在 broker 中;支持多种消费,包括集群消费、广播消费等。开发度较活跃,版本 更新很快。 缺点:产品较新,文档比较缺乏。没有在 mq 核心中去实现 JMS 等接口,对 已有系统而言不能兼容。 缺点:根据其他用户反馈,中 RocketMQ与Kafka对比

Kafka无限消息堆积,高效的持久化速度,主要用于日志传输。

RocketMQ广泛应用订单、交易、充值、消息推送、日志传输场景。 数据可靠性 RocketMQ 支持异步实时刷盘,同步刷盘,同步主从复制和异步主从复制 总结 总结:RocketMQ的同步刷盘在单机可靠性上比Kafka更高,不会因为操作系统崩溃,导致数据丢失。同时同步Replication也比Kafka异步Replication更可靠,数据完全无单点。另外Kafka的Replication以topic为单位,支持主机宕机,备机自动切换,但是这里有个问题,由于是异步Replication,那么切换后会有数据丢失,同时宕机的机器如果重启后,会与已经存在的主机器产生数据冲突(?)。开源版本的RocketMQ不支持Master宕机,Slave自动切换为Master,阿里云版本的RocketMQ支持自动切换特性 性能 RocketMQ单机写入TPS单实例约可以跑到最高12万条/秒,消息大小10个字节 总结 Kafka的TPS跑到单机百万,主要是由于生产者端将多个小消息合并,批量发向Broker。 RocketMQ为什么没有这么做? 生产者通常使用Java语言,缓存过多消息,GC是个很严重的问题 生产者调用发送消息接口,消息未发送到Broker,向业务返回成功,此时生产者宕机,会导致消息丢失,业务出错 生产者通常为分布式系统,且每台机器都是多线程发送,我们认为线上的系统单个生产者每秒产生的数据量有限,不可能上万。 缓存的功能完全可以由上层业务完成。 单机支持队列数 RocketMQ单机支持最高5万个队列,Load不会发生明显变化 Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长 2

Kafka单机写入TPS约在百万条/ Kafka 异步刷盘方式和异步主从复制 7万条/秒,单机部署3个Broker,秒,消息大小10个字节 消息投递实时性 RocketMQ使用长轮询,同Push方式实时性一致,消息的投递延时通常在几个毫秒。 Kafka使用短轮询方式,实时性取决于轮询间隔时间 Kafka消费失败不支持重试 消息失败重试 RocketMQ消费失败支持定时重试,每次重试间隔时间顺延 总结 例如充值类应用,当前时刻调用运营商网关,充值失败,可能是对方压力过多,稍后在调用就会成功,如支付宝到银行扣款也是类似需求。 这里的重试需要可靠的重试,即失败重试的消息不因为Consumer宕机导致丢失 严格的消息顺序 RocketMQ支持严格的消息顺序,在顺序消息场景下,一台Broker宕机后,发送消息会失败,但是不会乱序 Kafka支持消息顺序,但是一台Broker宕机后,就会产生消息乱序 Kafka不支持定时消息 定时消息 RocketMQ支持两类定时消息 开源版本RocketMQ仅支持定时Level 阿里云ONS支持定时Level,以及指定的毫秒级别的延时时间 分布式事务消息 消息查询 支持分布式事务消息 RocketMQ支持根据Message Id查询消息,也支持根据消息内容查询消息(发送消息时指定一个Message Key,任意字符串,例如指定为订单Id) Kafka不支持分布式事务消息 Kafka不支持消息查询 总结 消息查询对于定位消息丢失问题非常有帮助,例如某个订单处理失败,是消息没收到还是收到处理出错了。 消息回溯 RocketMQ支持按照时间来回溯消息,精度毫秒,例如从一天之前的某时某分某秒开始重新消费消息 Kafka理论上可以按照Offset来回溯消息 总结 典型业务场景如consumer做订单分析,但是由于程序逻辑或者依赖的系统发生故障等原因,导致今天消费的消息全部无效,需要重新从昨天零点开始消费,那么以时间为起点的消息重放功能对于业务非常有帮助。 消息并行度 RocketMQ消费并行度分两种情况1.顺序消费方式并行度同Kafka完全一致2.乱序方式并行度取决于Consumer的线程数,如Topic配置10个队列,10台机器消费,每台机器100个线程,那么并行度为1000。 Kafka的消费并行度依赖Topic配置的分区数,如分区数为10,那么最多10台机器来并行消费(每台机器只能开启一个线程),或者一台机器消费(10个线程并行消费)。即消费并行度和分区数一致。 Kafka不支持Broker端的消息过滤 Broker端消息过滤 RocketMQ支持两种Broker端消息过滤方式 1.根据Message Tag来过滤,相当于子topic概念2.向服务器上传一3

段Java代码,可以对消息做任意形式的过滤,甚至可以做Message Body的过滤拆分。 消息堆积能力 理论上Kafka要比RocketMQ的堆积能力更强,不过RocketMQ单机也可以支持亿级的消息堆积能力,我们认为这个堆积能力已经完全可以满足业务需求 成熟度 RocketMQ在阿里集团内部有大量的应用在使用,每天都产生海量的消息,并且顺利支持了多次天猫双十一海量消息考验,是数据削峰填谷的利器。 Kafka在日志领域比较成熟。 特性

1、支持严格的消息顺序;

2、支持Topic与Queue两种模式; 3、亿级消息堆积能力;

4、比较友好的分布式特性;

5、同时支持Push与Pull方式消费消息;

Topic发布订阅模式

topic数据默认不落地,是无状态的。并不保证publisher发布的每条数据,Subscriber都能接受到。一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。 一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器。 Queue点对点模式

Queue数据默认会在mq服务器上以文件形式保存,也可以配置成DB存储。Queue保证每条数据都能被receiver接收。Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。

Push 推送

类似于Broker Push消息到Consumer方式,但实际仍然是Consumer内部后台从Broker Pull消息

采用长轮询方式拉消息,实时性同push方式一致,且不会无谓的拉消息导致Broker、Consumer压力增大

Pull 拉取

短轮询方式,可以设定轮询时间间隔

4

版本 3.2.6

部署图

RocketMq主要包含三个服务:nameserver、broker、filterserver

Nameserver负责维护broker列表和路由功能。 Broker负责消息的读取和存储 Filterserver负责过滤查询

1. nameserver:稳定性非常高。因为nameserver相互独立,彼此没有通信关系,单台

nameserver挂掉,不影响其他nameserver,即使全部挂掉,也不影响业务系统使用,这点类似于dubbo的zookeeper。nameserver不会有频繁的读写,所以性能开销非常小,稳定性很高。

2. broker

1) 与nameserver关系 连接:

单个broker和所有nameserver保持长连接 心跳:

心跳间隔:每隔30秒(此时间无法更改)向所有nameserver发送心跳,心跳包含了自身的topic配置信息。 心跳超时:nameserver每隔10秒钟(此时间无法更改),扫描所有还存活的broker连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则断开连接。 断开:

时机:broker挂掉;心跳超时导致nameserver主动关闭连接

5

动作:一旦连接断开,nameserver会立即感知,更新topc与队列的对应关系,但不会通知生产者和消费者 2) 负载均衡

一个topic分布在多个broker上,一个broker可以配置多个topic,它们是多对多的关系。 如果某个topic消息量很大,应该给它多配置几个队列,并且尽量多分布在不同broker上,减轻某个broker的压力。

topic消息量都比较均匀的情况下,如果某个broker上的队列越多,则该broker压力越大。

3) 可用性

由于消息分布在各个broker上,一旦某个broker宕机,则该broker上的消息读写都会受到影响。所以rocketmq提供了master/slave的结构,salve定时从master同步数据,如果master宕机,则slave提供消费服务,但是不能写入消息,此过程对应用透明,由rocketmq内部解决。

这里有两个关键点:

一旦某个broker master宕机,生产者和消费者多久才能发现?受限于rocketmq的网络连接机制,默认情况下,最多需要30秒,但这个时间可由应用设定参数来缩短时间。这个时间段内,发往该broker的消息都是失败的,而且该broker的消息无法消费,因为此时消费者不知道该broker已经挂掉。

消费者得到master宕机通知后,转向slave消费,但是slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是消息最终不会丢的,一旦master恢复,未同步过去的消息会被消费掉。

4) 可靠性

所有发往broker的消息,有同步刷盘和异步刷盘机制,总的来说,可靠性非常高 同步刷盘时,消息写入物理文件才会返回成功,因此非常可靠

异步刷盘时,只有机器宕机,才会产生消息丢失,broker挂掉可能会发生,但是机器宕机崩溃是很少发生的,除非突然断电 5) 消息清理 扫描间隔:

默认10秒,由broker配置参数cleanResourceInterval决定 空间阈值:

物理文件不能无限制的一直存储在磁盘,当磁盘空间达到阈值时,不再接受消息,broker打印出日志,消息发送失败,阈值为固定值85% 清理时机:

默认每天凌晨4点,由broker配置参数deleteWhen决定;或者磁盘空间达到阈值 文件保留时长:

默认72小时,由broker配置参数fileReservedTime决定 6) 读写性能

文件内存映射方式操作文件,避免read/write系统调用和实时文件读写,性能非常高 永远一个文件在写,其他文件在读 顺序写,随机读

利用linux的sendfile机制,将消息内容直接输出到sokect管道,避免系统调用 7) 系统特性

大内存,内存越大性能越高,否则系统swap会成为性能瓶颈

6

IO密集

cpu load高,使用率低,因为cpu占用后,大部分时间在IO WAIT 磁盘可靠性要求高,为了兼顾安全和性能,采用RAID10阵列 磁盘读取速度要求快,要求高转速大容量磁盘

3. 消费者

1) 与nameserver关系 连接:

单个消费者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,消费者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。 心跳:

与nameserver没有心跳 轮询时间:

默认情况下,消费者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,客户端最多要30秒才能感知。该时间由DefaultMQPushConsumer的pollNameServerInteval参数决定,可手动配置。

2) 与broker关系 连接

单个消费者和该消费者关联的所有broker保持长连接。 心跳

默认情况下,消费者每隔30秒向所有broker发送心跳,该时间由DefaultMQPushConsumer的heartbeatBrokerInterval参数决定,可手动配置。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费 断开:

时机:消费者挂掉;心跳超时导致broker主动关闭连接

动作:一旦连接断开,broker会立即感知到,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费

3) 负载均衡

集群消费模式下,一个消费者集群多台机器共同消费一个topic的多个队列,一个队列只会被一个消费者消费。如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。

4) 消费机制 本地队列

消费者不间断的从broker拉取消息,消息拉取到本地队列,然后本地消费线程消费本地消息队列,只是一个异步过程,拉取线程不会等待本地消费线程,这种模式实时性非常高。对消费者对本地队列有一个保护,因此本地消息队列不能无限大,否则可能会占用大量内存,本地队列大小由DefaultMQPushConsumer的pullThresholdForQueue属性控制,默认1000,可手动设置。 轮询间隔

消息拉取线程每隔多久拉取一次?间隔时间由DefaultMQPushConsumer的pullInterval

7

属性控制,默认为0,可手动设置。 消息消费数量

监听器每次接受本地队列的消息是多少条?这个参数由DefaultMQPushConsumer的consumeMessageBatchMaxSize属性控制,默认为1,可手动设置。

5) 消费进度存储

每隔一段时间将各个队列的消费进度存储到对应的broker上,该时间由DefaultMQPushConsumer的persistConsumerOffsetInterval属性控制,默认为5秒,可手动设置。

6) 如果一个topic在某broker上有3个队列,一个消费者消费这3个队列,那么该消费者

和这个broker有几个连接?

一个连接,消费单位与队列相关,消费连接只跟broker相关,事实上,消费者将所有队列的消息拉取任务放到本地的队列,挨个拉取,拉取完毕后,又将拉取任务放到队尾,然后执行下一个拉取任务

4. 生产者

1) 与nameserver关系 连接:

单个生产者者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,生产者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。 轮询时间:

默认情况下,生产者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,生产者最多要30秒才能感知,在此期间,发往该broker的消息发送失败。该时间由DefaultMQProducer的pollNameServerInteval参数决定,可手动配置。 心跳:

与nameserver没有心跳

2) 与broker关系 连接:

单个生产者和该生产者关联的所有broker保持长连接。 心跳:

默认情况下,生产者每隔30秒向所有broker发送心跳,该时间由DefaultMQProducer的heartbeatBrokerInterval参数决定,可手动配置。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接。 连接断开

移除broker上的生产者信息

3) 负载均衡

生产者时间没有关系,每个生产者向队列轮流发送消息

8

部署Broker

Broker集群有多种配置方式: 1,单Master

优点:除了配置简单没什么优点

缺点:不可靠,该机器重启或宕机,将导致整个服务不可用 2,多Master

一个集群无Slave,全是Master,例如2个Master或者3个Master

优点:配置简单,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)性能最高

缺点:可能会有少量消息丢失(配置相关),单台机器重启或宕机期间,该机器下未被消费的消息在机器恢复前不可订阅,影响消息实时性

3,多Master多Slave,每个Master配一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟,毫秒级

优点:性能同多Master几乎一样,实时性高,主备间切换对应用透明,不需人工干预 缺点:Master宕机或磁盘损坏时会有少量消息丢失

4,多Master多Slave,每个Master配一个Slave,有多对Master-Slave,HA采用同步双写方式,主备都写成功,向应用返回成功

优点:服务可用性与数据可用性非常高

缺点:性能比异步HA略低,3.1.4版本主宕备不能自动切换为主

Master和Slave的配置文件参考conf目录下的配置文件 Master与Slave通过指定相同的brokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数

一个Master下面可以挂载多个Slave,同一Master下的多个Slave通过指定不同的BrokerId来区分

1) 安装搭建环境:

软件安装包:

jdk-7u67-linux-x64.tar.gz alibaba-rocketmq-3.1.8.tar.gz

1、 解压

2、 可执行权限:chmod +x mqadmin mqbroker mqfiltersrv mqshutdown mqnamesrv 3、 启动mqnamesrv:mqnamesrv >/home/sre/alibaba-rocketmq/log/ns.log & 4、 启动mqbroke:mqbroker >/home/sre/alibaba-rocketmq/log/mq.log &

关闭:

mqshutdown namesrv mqshutdown broker

9

创建 top

sh mqadmin updateTopic -b 10.77.144.160:10911 -n 10.77.144.160:9876 -t top01 sh mqadmin updateTopic -cDefaultCluster -n 10.1.169.16:9876 -t top02

sh mqadmin updateTopic -b 10.1.169.238:10911 -n 10.1.169.16:9876 -t top03

注:http://blog.csdn.net/zhu_tianwei/article/details/40951301命令整理

查询topic列表

sh mqadmin topicList –n 127.0.0.1:9876

删除topic

sh mqadmin deleteTopic –n 10.1.169.16:9876 –c group_name1–t top003

sh mqadmin updateSubGroup–b 10.1.169.238:10911–g gn1 –n 10.1.169.16:9876

broker地址: #Cluster Name DefaultCluster #Broker Name WDDS-DEV-016 #BID #Addr 0 #Version #InTPS 0.00 #OutTPS 0.00 10.1.169.238:10911 V3_1_3 删除group

sh mqadmin deleteSubGroup -b 10.1.169.238:10911 -g please_rename_unique_group_name_4 -n 192.168.1.101:9876

2) Demo测试

Push推送模式:PushConsumer.java Pull拉取模式:PullConsumer.java

基本概念:

Topic:消息的逻辑管理单位;

Queue:消息的物理管理单位,一个topic下可以有多个queue,Queue的引入使得消息存储可以分布式集群化,

事务消息:这样的消息有多个状态,并且其发送是两个阶段,第一个阶段发送PREPARED状态的消息,此时consumer是看不见这种状态的消息的,发送完毕后回调用户的TransactionExecutor接口,执行相应的事务操作,当事务操作成功时,则对此条消息返回commint,让broker对该消息执行commit操作,成为commit状态的消息对consumer是可见的。

10

本文来源:https://www.bwwdw.com/article/cd5f.html

Top