本文作者:刘涛,阿里云智能技术专家。
(资料图片仅供参考)
01Compaction Topic介绍
一般来说,消息队列提供的数据过期机制有如下几种,比如有基于时间的过期机制——数据保存多长时间后即进行清理,也有基于数据总量的过期机制——数据分区数据量达到一定值后进行清理。
而 Compaction Topic 是一种基于 key 的数据过期机制,即对于相同 key 的数据只保留最新值。
该特性的应用场景主要为维护状态信息,或者在需要用到 KV 结构时,可以通过 Compaction Topic 将 key-value 信息直接保存到 MQ,从而解除对外部数据库的依赖。比如维护消费位点,可以将消费组加分区作为 key ,将消费位点做 offset ,以消息形式发送到 MQ ,压缩之后,消费时获取最新 offset 信息即可。另外,像 connect 里的 source 信息比如 Binlog 解析位点或其他 source 处理的位点信息均可存到 Compaction Topic。同时 Compaction Topic 也支持 存储 RSQLDB 与 RStreams 的 checkpoint 信息。
02需要解决的问题
Compaction 过程中,需要解决如下几个问题:
第一,数据写入过程中,数据如何从生产者发送到 broker 并且最终落盘,数据主备之间的 HA 如何保证?
第二,整个 compaction 的流程包括哪几个步骤?如果数据量太大,如何优化?
第三,数据消费时如何索引消息?如果找不到消息指定的 offset 消息,如何处理?
第四,如果有机器故障,如何恢复老数据?
03方案设计与实现
第一,数据如何写入。
首先写入到 CommitLog,主要为复用 CommitLog 本身的 HA 能力。然后通过 reput线程将 CommitLog 消息按照 Topic 加 partition 的维度拆分到不同文件里,按分区整理消息,同时生成索引。这样最终消息就按 Topic 加 partition的粒度做了规整。
在 compaction 过程中,为什么不在原先的 commitLog 上做规整,而是再额外按分区做规整?原因如下:
所有数据都会写到 CommitLog ,因此单个 Topic 的数据不连续。如果要遍历单个 topic 的所有数据,可能需要跳着读,这样就会导致大量冷读,对磁盘 IO 影响比较大。CommitLog 数据有自动过期机制,会将老数据删除,因此不能将数据直接写到 CommitLog,而 CompactionLog 里的老数据为按 key 过期,不一定会删除。compact 以分区为维度进行。如果多个分区同时做 compact ,效率较低。因为很多分区的 key 同时在一个结构里,会导致同一个分区能够 compact 的数据比较少, 并且 compact 之后也需要重新写一份么,因此,索性就在 compact 之前将消息通过 reput service 重新归整一遍。Compact 流程如下:
第一步,确定需要做 compaction 的数据文件列表。一般大于两个文件,需要排除当前正在写的文件。
第二步,将上一步筛选出的文件做遍历,得到 key 到 offset 的映射关系。
第三步,根据映射关系将需要保留的数据重新写到新文件。
第四步,用新文件替换老文件,将老文件删除。
第二步的构建 OffsetMap 主要目的在于可以知道哪文件需要被保留、哪文件需要被删除,以及文件的前后关系,这样就可以确定写入的布局,确定布局之后,就可以按照append 的方式将需要保留的数据写到新文件。
此处记录的并非 key 到 value 的信息,而是 key 到 Offset 的信息。因为 value 的数据 body 可能较长,比较占空间,而 offset 是固定长度,且通过 offset 信息也可以明确消息的先后顺序。另外,key 的长度也不固定,直接在 map 存储原始 key 并不合适。因此我们将 MD5 作为新 key ,如果 MD5 相同 key 认为也相同。
做 compaction 时会遍历所有消息,将相同 key 且 offset 小于 OffsetMap 的值删除。最终通过原始数据与 map 结构得到压缩之后的数据文件。
上图为目录结构展示。写入时上面为数据文件,下面为索引,要 compact 的是标红两个文件。压缩后的文件存储于子目录,需要将老文件先标记为删除,将子目录文件与 CQ 同时移到老的根目录。注意,文件与 CQ 文件名一一对应,可以一起删除。
随着数据量越来越大,构建的 OffsetMap 也会越来越大,导致无法容纳。
因此不能使用全量构建方式,不能将所有要 compact 的文件的 OffsetMap 一次性构建,需要将全量构建改为增量构建,构建逻辑也会有小的变化。
第一轮构建:如上图,先构建上面部分的 OffsetMap ,然后遍历文件,如果 offset 小于 OffsetMap 中对应 key 的 offset 则删除,如果等于则保留。而下面部分的消息的offset 肯定大于 OffsetMap 内的 offset ,因此也需要保留。
第二轮构建:从上一次结束的点开始构建。如果上一轮中的某个 key 在新一轮中不存在,则保留上一轮的值;如果存在,则依然按照小于删除、大于保留的原则进行构建。
将一轮构建变为两轮构建后, OffsetMap 的大小显著降低,构建的数据量也显著降低。
原先的索引为 CommitLog Position、Message Size 和 Tag Hush,而现在我们复用了bcq 结构。由于 Compact 之后数据不连续,无法按照先前的方式直接查找数据所在物理位置。由于 queueOffset 依然为单调增排列,因此可以通过二分查找方式将索引找出。
二分查找需要 queueoffset 信息,索引结构也会发生变化,而 bcq 带有 queueoffse 信息,因此可以复用 bcq 的结构。
Queueoffset 在 compact 前后保持不变。如果 queueoffset 不存在,则获取第一个大于 queueoffset 的消息,然后从头开始将所有全量数据发送给客户端。
机器故障导致消息丢失时,需要做备机的重建。因为 CommitLog 只能恢复最新数据,而 CompactionLog 需要老数据。之前的 HA 方式下,数据文件可能在 compact 过程中被被删除,因此也不能基于复制文件的方式做主备间同步。
因此,我们实现了基于 message 的复制。即模拟消费请求从 master 上拉取消息。拉取位点一般从 0 开始,大于等于 commitLog 最小offset 时结束。拉取结束之后,再做一次 force compaction 将 CommitLog 数据与恢复时的数据做一次 compaction ,以保证保留的数据是被压缩之后的数据。后续流程不变。
04使用说明
生产者侧使用现有生产者接口,因为要按分区做 compact ,因此需要将相同 key 路由到相同的 MessageQueue,需要自己实现相关算法。
消费者侧使用现有消费者接口,消费到消息后,存入本地类 Map 结构中再进行使用。我们的场景大多为从头开始拉数据,因此需要在开始时将消费位点重置到0。拉取完以后,将消息 key 与 value 传入本地 kv 结构,使用时直接从该结构拿取即可。