banner

Overview

分层存储是 Apache Kafka 3.6.0 提供的一项新的功能,它支持独立扩展计算和存储资源,提供更好的客户端隔离,并允许更快地维护 Kafka 集群。让我们深入了解这项新功能的动机、设计和实施细节。在这篇文章中,我们将重点介绍分层存储的实现,因此我们假定你已经很好地理解了 Kafka 的架构和主要组件。

Tiering

分层的概念在软件行业并不陌生。三层架构是一种成熟的软件应用程序架构,它将应用程序分为三个逻辑和物理计算层:呈现层(或用户界面)、应用层(处理数据)和数据层(存储和管理与应用程序相关的数据)。

分层存储指的是一种存储架构,它根据数据的访问模式、性能要求和成本考虑,使用不同类别或层级的存储来有效地管理和存储数据。这样可以确保最重要和最频繁访问的数据保存在高性能的 “热 “存储(本地)中,而不太重要或不太频繁访问的数据则转移到成本较低、性能较低的 “热 “存储(远程)中。

我们还可以将 “冷 “存储定义为定期备份系统,但这通常是一个单独的流程。这三个层级如下图 1 所示。

Tiers of storage

使用分层存储时,数据在任何时候都停留在一种介质类型上,并随着数据访问模式的变化在介质之间移动。数据不是复制,而是移动到不同的存储介质上,选择最能平衡可用性、性能和存储介质成本的位置。这样,就能更好地利用存储硬件,同时最大限度地提高关键任务应用程序的性能。某些数据可能仍被缓存,但这只是出于性能考虑。

The need for remote storage

Apache Kafka 设计用于从数千个客户端摄取大量数据。到目前为止,一个Topic分区的副本是由多个分段文件(Segment)组成的,而这些分段文件完全托管在 Kafka 代理的单个磁盘上,因此你可以存储的数据量是有限的。

如果要保存历史数据怎么办?这就是需要远程存储的地方。常规主题的历史数据需要存储在某个地方。事件来源和变更数据捕获是两种可能需要历史数据的架构模式。我们可能需要它们来进行灾难恢复、初始化新的应用程序、在修复业务逻辑问题后重建应用程序状态、重新训练机器学习模型。

在 Kafka 外部保存历史数据基本上有两种方法:

  • 通过额外进程将Kafka中的数据全部读取并拷贝至远程存储。这样做的缺点是,数据消费者必须根据数据的年龄构建不同版本的应用程序,以便从不同的系统中读取数据。有时,需要使用 Kafka Connect 来导出和导入数据。
  • 使用运行在 Kafka 机器上的自定义程序,检查不活动的数据段,并将其移动到远程存储中。这些段已达到 segment.bytes 所定义的最大大小,并已关闭和以只读方式重新打开。这种解决方案的优点是无需向 Kafka 回馈重新处理的成本,但需要维护另一个程序。

Tiered storage in Kafka

自定义程序解决方案类似于 Kafka 3.6.0 中作为早期访问发布的新分层存储功能(KIP-405)。这项新功能提供了一种标准化的内置机制,可根据保留配置自动将本地存储(如 SSD/NVMe)中的非活动片段移动到远程存储(如 S3/HDFS),并在客户端要求时从远程存储读回数据。可以通过插件支持各种外部存储系统,这些插件并不是 Kafka 项目的一部分。

分层存储必须在Cluster级别通过 remote.log.storage.system.enable 启用,在Topic级别通过 remote.storage.enable 启用。在Topic级别可以配置两种保留期:本地存储的保留期为 local.log.retention.ms,远程存储的保留期为 log.retention.ms(也可使用基于大小的保留期)。

Kafka Broker 会尽快将数据移动到远程存储,但在某些情况下,如高摄取率(ingestion rate)或连接问题,本地存储中存储的数据可能会多于指定的本地保留策略。任何超过本地保留阈值的数据在成功上传到远程存储之前都不会被清除。只有Leader节点的非活动日志段才会上载到远程存储,而不是Followers,但你仍然可以从Followers那里获取数据。

当消费者请求本地尚未提供但在可用偏移范围内的记录(信息)时,Broker会从远程存储中获取它们,并在本地缓存索引文件。因此建议为此预留磁盘空间(默认为 1 GiB,可通过 remote.log.index.file.cache.total.size.bytes 进行配置)。

分层存储有多种好处:

  • 弹性(Elasticity): 计算和存储资源分离,现在可以独立扩展,由于本地数据减少,某些集群操作会变得更快。
  • 隔离性(Isolation): 对延迟敏感的数据可照常从本地层提供,而历史数据可使用不同的代码路径和线程从远程层提供,无需更改 Kafka 客户端。
  • 成本效益(Cost efficiency): 远程对象存储系统的成本低于本地快速磁盘。Kafka 存储变得更便宜,而且几乎不受限制。

此外还有一些限制,将在今后的版本中加以解决:

  • 不支持多个日志目录 (JBOD) 和压缩Topic(包括转换为常规Topic的压缩Topic)。
  • 要在Topic上禁用分层,需要先将数据传输到另一个Topic或外部存储,然后删除Topic。

一个仍在开发中的重要功能是可以动态设置Broker级配额,以限制我们上传和下载段(segments)的速度(KIP-956)。这对于避免过度使用Broker资源和远程存储性能下降非常有用。例如,在现有Topic上首次启用分层存储时就会出现这种情况。

远程存储操作有几个指标,可用于监控和创建有关远程存储服务的告警(例如,缓慢的远程上传/下载速率、高远程上传/下载错误率、繁忙的上传/下载线程池)。

在 Kafka 3.7.0 中,有新指标可报告每个Topic的段或字节复制和删除滞后情况:

  • kafka.server:type=BrokerTopicMetrics,name=RemoteCopyLagSegments,topic=([-.w]+)
  • kafka.server:type=BrokerTopicMetrics,name=RemoteCopyLagBytes,topic=([-.w]+)
  • kafka.server:type=BrokerTopicMetrics,name=RemoteDeleteLagBytes,topic=([-.w]+)
  • kafka.server:type=BrokerTopicMetrics,name=RemoteDeleteLagSegments,topic=([-.w]+)

Kafka internals

现在,我们将深入了解 Apache Kafka 的内部结构,看看消息是如何产生和消费的。然后,我们将了解分层存储如何与之集成,以及所有组件如何相互配合。如果您已经熟悉 Kafka 存储的内部结构,可以直接跳到分层存储如何工作部分。

How records are stored

让我们先看看记录是如何存储在磁盘上的。在 Kafka 中,我们有两个主要的子系统:由处理用户数据的组件组成的 “数据平面 “和由处理集群元数据(控制器选举、Topic配置和集群)的组件组成的 “控制平面”。如图 2 所示。

Data and control planes

默认情况下,当一批新的记录到达时,它会首先写入操作系统的页面缓存(Page Cache),然后才异步刷新到磁盘。如果 Kafka JVM 因某种原因崩溃,最近的信息仍在页面缓存中,并将被操作系统刷新。但是,这并不能防止机器崩溃时数据丢失。这就是启用主题复制的重要原因:拥有多个复制意味着只有多个代理同时崩溃时,数据才有可能丢失。为了进一步提高容错能力,可以使用机架感知的 Kafka 集群,将Topic副本均匀地分布在同一地理区域的数据中心中。

当应用程序调用 producer.send() 时,生产者会确定目标 Broker。记录可能不会立即发送,而是会根据时间或大小配置属性进行累积并批量发送。Broker 的网络线程之一会接收到 ProduceRequest,该网络线程将请求转换为 ProduceRequest 并传递给 IO 线程。在将数据追加到分区段(日志和索引文件)之前,IO 线程会对请求进行验证。参见图3。

Data plane message flow

默认情况下,只有在生产请求被其他Broker复制后,Broker才会确认该请求。这不会阻塞 IO 线程。复制完成后,Broker会生成一个ProduceResponse。最后,网络线程将该响应发送给生产者。

Kafka 消费者发送 Fetch 请求,指定所需的Topic、分区和偏移量。代理的网络线程会将请求传递给 IO 线程,IO 线程会将偏移量与段索引文件中的相应条目进行比较。这一步决定了从日志文件中读取的确切字节数。

How tiered storage works

日志段(log segment)由记录数据和相关元数据组成,覆盖分区副本中连续的偏移范围。在Broker的磁盘上,这些数据存储在多个文件中,日志本身保存记录批次,索引文件支持对日志的有效访问,等等。分层存储功能包括将这些文件移入或移出远程对象存储。

为了向日志添加数据,IO 线程会通知 ReplicaManager,这就是分层存储的插件。主要组件包括 RemoteLogManager(RLM)RemoteStorageManager(RSM)RemoteLogMetadataManager(RLMM)RSMRLMM 都是可插拔接口,因此可以支持不同类型的远程存储系统。如图 4 所示。

Tiered storage high level architecture

出于性能考虑,使用了两个缓存。由 TopicBasedRemoteLogMetadataManager(默认 RLMM 实现)维护的内存 RemoteLogMetadataCache,其中包含所有未删除的远程分段的元数据。由 RemoteLogManager 维护并存储在 remote-log-index-cache 文件夹中的基于文件的 RemoteIndexCache,有助于避免每次获取调用都从远程存储中重新获取偏移量、时间索引等索引文件。

RemoteStorageManager

RemoteStorageManager 提供了远程日志段的生命周期,包括从远程存储中复制、获取和删除。

RemoteStorageManager 提供的日志段元数据是 RemoteLogSegmentMetadata 实例,它描述了上传到远程存储的日志段。它包含一个普遍唯一的 RemoteLogSegmentId,是已外部化到远程存储的段数据的标识符。RemoteLogSegmentId 由Topic名称、Topic ID、分区编号和 UUID 组成。

日志段数据(LogSegmentData)包含需要存储在远程存储中的特定日志段的实际数据和索引。

这就是带有索引和辅助状态的远程段在远程存储中的样子:

test-0-00000000000000000000-knnxbs3FSRyKdPcSAOQC-w.index
test-0-00000000000000000000-knnxbs3FSRyKdPcSAOQC-w.snapshot
test-0-00000000000000000000-knnxbs3FSRyKdPcSAOQC-w.leader_epoch_checkpoint
test-0-00000000000000000000-knnxbs3FSRyKdPcSAOQC-w.timeindex
test-0-00000000000000000000-knnxbs3FSRyKdPcSAOQC-w.log
test-0-00000000000000001005-SpGkL2YWRMC5DIw5TJP_EA.index
test-0-00000000000000001005-SpGkL2YWRMC5DIw5TJP_EA.leader_epoch_checkpoint
test-0-00000000000000001005-SpGkL2YWRMC5DIw5TJP_EA.log
test-0-00000000000000001005-SpGkL2YWRMC5DIw5TJP_EA.snapshot
test-0-00000000000000001005-SpGkL2YWRMC5DIw5TJP_EA.timeindex

文件名不一定必须是这种格式,它由 RemoteStorageManager 插件定义,但基本上,它通常包含这些信息,以便相互区分:

  • Topic name
  • Partition ID
  • Log segment base offset
  • Remote segment UUID
  • FILE_TYPE

FILE_TYPE 可以是以下类型之一:

  • leader-epoch-checkpoint 文件,其中包含该分区的 leader_epochstart_offset 映射的元数据。
  • 偏移索引文件,包含日志段的偏移索引。
  • 时间索引文件包含日志段的时间索引。
  • 生产者快照文件包含生产者 ID 与某些元数据(如日志段的序列号和纪元)的映射。
  • 可选的事务索引文件包含日志段的中止事务。这是一个可选文件,只有在启用事务生产者时才会存在。

需要注意的是,虽然 KIP-405 中的规范并不强制执行存储在远程存储器中的文件名格式,但在从远程存储器获取数据时,除了可选的事务索引外,任何丢失的文件都会导致异常和获取失败。

RemoteLogMetadataManager

RemoteLogMetadataManager 用于存储关于上传段的 RemoteLogSegmentMetadata,需要一个支持强一致性的存储系统,而有些存储系统只支持最终一致性。

这就是 Kafka 提供基于Topic的 TopicBasedRemoteLogMetadataManager 的原因,它使用名为 __remote_log_metadata 的内部非压缩Topic来存储元数据记录。每个元数据操作都有一个相关的记录类型,该类型定义在 storage/src/main/resources/message 中。

RemoteLogManager

启用分层存储后,Broker会启动 RemoteLogManager,该组件负责维护拥有远程日志段的Topic分区列表。它负责初始化 RemoteStorageManagerRemoteLogMetadataManager,处理有关Leader节点/Follower节点的变更和分区停止的事件,向其他Broker侧组件提供操作以获取有关远程日志段的索引和元数据。

当所有组件启动后,ReplicaManager 会等待副本对任意分区获取领导权(leadership)。

此时,系统会为每个受管理的Topic分区创建一个RLMTask,并通过 RLMScheduledThreadPoolremote.log.manager.task.interval.ms 配置的间隔周期性地调度执行。

当每次调度执行时,RLMTask会执行两个子任务:

  • 将日志段(log segments)复制到远程存储
  • 从远程存储中删除日志段

LogSegment copy

为了确定哪些 LogSegment 符合复制到远端存储的条件,RLMTask 会比较两个偏移量:

  1. 最后稳定偏移量 (LSO)
    • 对于非事务性记录:等于高水位线(high watermark)
    • 对于事务性记录:等于最后提交的偏移量
  2. 最后远端偏移量 (LRO)
    • 表示已存储在远端存储中的最后偏移量
    • 通过 RemoteLogMetadataManager 获取

当 LSO > LRO 时,RLMTask 会检查该偏移量范围内是否存在任何非活跃状态的 segment。这些非活跃 segment 即符合复制到远端存储的条件。该过程的示意图可参考图5。

Sequence diagram for segment copy

LogSegment deletion

远程日志保留机制与本地日志保留机制类似,区别在于日志段存储于远程存储中。其核心逻辑依然是通过 retention.bytesretention.ms 参数删除旧的日志段。

RLMTask 严重依赖 RemoteLogMetadataManager 中存储的元数据来识别待删除的候选日志段。需注意:

  • retention.bytes 的计算范围:包含本地日志段与远程日志段的总大小
  • retention.ms 的判定基准:同时覆盖本地与远程的日志时间戳

Consumer fetch

针对分层存储场景,消费者Fetch请求的处理逻辑发生如下变更。

当所需偏移量数据已迁移至远程存储时,ReplicaManager将抛出OffsetOutOfRangeException。此时系统会将请求暂存至RemoteFetchPurgatory队列,并通过向RemoteStorageThreadPool线程池提交RemoteLogReader任务的方式,将该请求委托给RemoteLogManager处理。

每个RemoteLogReader每次仅处理一个远程Fetch请求。当远程读取请求处于RemoteFetchPurgatory等待队列时,当前系统会复用fetch.max.wait.ms作为延迟超时参数。

这种设计存在明显缺陷,因为该参数原始设计用途是控制当本地无可用数据时Fetch请求的等待时长。根据KIP-1018提案,后续将通过引入远程读取延迟超时(remote fetch delay timeout)专用配置参数来优化此问题,具体架构如图6所示。

Sequence diagram for consumer Fetch request

Follower fetch

对于采用分层存储的Topic,Follower副本的同步逻辑有如下重要调整:当Follower分区副本从Leader获取数据时若触发OffsetMovedToTieredStorageException异常,即表明请求的offset数据已不在Leader本地存储中,此时Follower需基于远端分段元数据(remote segment metadata)重建辅助状态。

副本的辅助状态包括leader-epoch检查点(日志复制协议用它来维护Broker间的日志一致性)和producer-id快照(事务子系统使用它)。

副本会调用带有EarliestLocalTimestamp参数的ListsOffsets API来确定需要重建该状态的截止偏移量。在辅助状态构建完成后,副本将截断本地日志,将拉取偏移量重置为远端日志末端偏移量加1,然后恢复从leader本地存储拉取数据。

Conclusion

我们深入探讨了Kafka采用远端存储的必要性,特别是对于容灾恢复、数据分析和机器学习等关键场景中历史数据的保留需求。

Kafka分层存储的引入提供了一个标准化、内建的解决方案。通过基于数据访问模式、性能需求和成本考量配置不同存储层级,该方案在确保高效数据管理的同时,实现了资源利用的最优化。

我们还解析了支撑远端日志子系统的技术架构,详细阐释了RemoteLogManager(远端日志管理器)、RemoteStorageManager(远端存储管理器)和RemoteLogMetadataManager(远端日志元数据管理器)等组件的功能。

分层存储代表着Kafka生态系统在数据管理领域的重要突破,为现代化分布式架构提供了兼顾存储效率与检索性能的稳健解决方案。