一、RocketMQ存储概要设计
RMQ主要存储的文件包括commitlog文件、consumeQueue文件、IndexFile文件。
- CommitLog是消息存储文件,所有消息主题的消息都存储在CommitLog文件中;
- ConsumeQueue是消息消费队列文件,消息达到commitlog文件后将被异步转发到消息消费队列,供消息消费者消费;
- IndexFile是消息索引文件,主要存储的是key和offset的对应关系。
RocketMQ的数据流向如下图所示:
二、RocketMQ存储文件
1、磁盘存储文件
- abort:RocketMQ 启动时生成,正常关闭时删除,如果启动时存在该文件,代表 RocketMQ 被异常关闭
- checkpoint:文件检查点,存储 commitlog 、consumequeue、indexfile 最后一次刷盘时间或时间戳
- index:消息索引文件存储目录
- consumequeue:消息消费队列存储目录
- commitlog:消息存储目录
- config:运行时的配置信息,包含主席消息过滤信息、集群消费模式消息消费进度、延迟消息队列拉取进度、消息消费组配置信息、topic配置属性等
RocketMQ的存储文件(CommitLog、ConsumeQueue、IndexFile)都被设计为固定长度,如果一个文件写满以后再创建一个新文件,文件名就为该文件第一条消息对应的全局物理偏移量。
1、CommitLog文件
文件名为 20 位数字组织,以该文件第一条消息的偏移量为文件名,长度不足 20 的在前面补 0。文件默认大小为 1G,可根据 mappedFileSizeCommitLog 属性改变文件大小。
存储所有消息内容,写满一个文件后生成新的 commitlog 文件。所有 topic 的数据存储在一起,逻辑视图如下:
1 ) TOTALSIZE:该消息条目总长度,4字节
2 ) MAGICCODE:魔数,4字节,固定值0xdaa320a7
3 ) BODYCRC:消息体crc校验码,4字节
4 ) QUEUEID:消息消费队列ID,4字节
5 ) FLAG:消息FLAG,RocketMQ不做处理,供应用程序使用,默认4字节
6 ) QUEUEOFFSET :消息在消息消费队列的偏移量,8字节
7 ) PHYSICALOFFSET:消息在 CommitLog文件中的偏移量,8字节
8 ) SYSFLAG:消息系统 Flag ,例如是否压缩、是否是事务消息等,4字节
9 ) BORNTIMESTAMP:消息生产者调用消息发送API的时间戳, 8字节
10 ) BORNHOST:消息发送者IP、端口号,8字节
11 ) STORETIMESTAMP: 消息存储时间戳,8字节
12 ) STOREHOSTADDRESS: Broker 服务器 IP+端口号,8字节
13 ) RECONSUMETIMES:消息重试次数,4字节
14 ) Prepared Transaction Offset:事务消息物理偏移量,8字节
15 ) BodyLength :消息体长度,4字节
16 ) Body:消息体内容,长度为 bodyLength 中存储的值
17 ) TopieLength:主题存储长度,1字节,表示主题名称最多 255 字符
18 ) Topic: 主题,长度为 TopieLength 中存储的值
19 ) PropertiesLength:消息属性长度,2字节,表示消息属性长度不能超过 65536个字符
20 ) Properties:消息属性,长度为 PropertiesLength 中存储的值
消息的查找实现
1)、首先获取当前commitlog目录的最小偏移量:首先获取目录下的第一个文件,如果该文件可用,则返回该文件的起始偏移量,否则返回下一个文件的起始偏移量;
2)、根据该offset返回下一个文件的起始偏移量:后获取一个文件的大小,减去(offset%MappedFileSize)其目的是回到下一文件的起始偏移量;
3)、根据偏移量与消息长度查找消息:首先根据偏移找到所在的物理偏移量,然后用offset与文件长度取余得到在文件内的偏移量,从该偏移量读取size长度的内容返回即可。如果只根据消息便宜查找消息,则首先找到文件内的偏移量,然后尝试读取4个字节获取消息的实际长度,最后读取指定字节即可。
2、ConsumeQueue文件
ComsumeQueue可以看成是CommitLog关于消息消费的“索引”文件,ConsumeQueue的第一级目录为消息主题,第二级目录为主题的消息队列:
为了加速ConsumeQueue消息条目的检索速度与节省磁盘空间,ConsumeQueue中的存储单元是一个20 字节定长的数据,包含8字节的commitlogoffset(这条消息在commitlog文件的实际偏移量)、4字节的消息大小、以及消息的hash值。
3、Index索引文件
为了提高根据主题与消息队列检索消息的速度,RocketMQ 引入了 Hash 索引机制为消息建立索引,对 CommitLog 进行数据索引,这里借用了HashMap的思想:
索引文件布局如下:
1)、IndexHeader头部,包含40个字节,记录该IndexFile的统计信息,其结构如下:
- beginTimestamp:包含消息的最小存储时间
- endTimestamp: 包含消息的最大存储时间
- beginPhyoffset:包含消息的最小物理偏移量(commitlog文件偏移量)
- endPhyoffset:包含消息的最大物理偏移量(commitlog文件偏移量)
- hashslotCount:hashslot个数,使用意义不大
- indexCount:Index条目列表当前已使用的个数
2)、Hash槽,一个IndexFile默认包含500万个Hash槽,每个hash槽存储的是落在该hash槽的hashcode最新的Index的索引
3)、Index条目列表,默认一个索引文件包含2000万个条目,每一个Index条目结构如下:
- hashCode:key的hashcode
- phyoffset:消息对应的物理偏移量
- timedif:该消息村粗时间与第一条消息的时间戳的差值,小于0该消息无效
- preIndexNo:该条目的前一条记录的Index索引,当出现hash冲突时,构建的链表结构
4、CheckPoint文件
记录 CommitLog,ConsumeQueue,IndexFile 的刷盘时间点,文件固定长度为 4k,其中只用该文件的前 24个字节,存储格式如下:
MessageStore中存储的消息除了通过ConsumeQueue提供给consumer消费之外,还支持通过MessageID或者MessageKey来查询消息;使用ID查询时,因为ID就是用broker+offset生成的(这里msgId指的是服务端的),所以很容易就找到对应的commitLog文件来读取消息。对于用MessageKey来查询消息,MessageStore通过构建一个index来提高读取速度。
如果消费者直接基于commitlog 进行消费的话,简直就是一个恶梦,因为不同的主题的消息完全顺序的存储在 commitlog 文件中,根据主题去查询消息,不得不遍历整个 commitlog 文件,显然作为一款消息中间件这是绝不允许的。RocketMQ 的ConsumeQueue 文件就是来解决消息消费的。首先我们知道,一个主题,在 broker 上可以分成多个消费对列,默认为4个,也就是消费队列是基于主题+broker。那 ConsumeQueue 中当然不会再存储全量消息了,而是存储为定长(20字节,8字节commitlog 偏移量+4字节消息长度+8字节tag hashcode),消息消费时,首先根据 commitlog offset 去 commitlog 文件组(commitlog每个文件1G,填满了,另外创建一个文件),找到消息的起始位置,然后根据消息长度,读取整条消息。但问题又来了,如果我们需要根据消息ID,来查找消息,consumequeue 中没有存储消息ID,如果不采取其他措施,又得遍历 commitlog文件了,为了解决这个问题,rocketmq 的 index 文件又派上了用场。