kafka 的创建topic 流程(一)更新中

Last updated on a minute ago

kafka 版本 2.6

创建一个 topic 后,kafka 底层做了什么?生成了什么元数据以及存放在哪? topic 的分区数目是如何分布在各个节点的?

通过命令创建 一个副本数为 1 ,分区数为 1 的 topic,先看看底层发生了什么,然后在分析流程

kafka 日志目录

一个分区对应一个目录,文件有 索引文件和日志文件

1
2
3
4
5
6
7
8
9
10
11
(base) user@userdeMacBook-Air-10 bin % ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1
--partitions 1 --topic test
Created topic test.
(base) user@userdeMacBook-Air-10 bin % ls -l /tmp/kafka-logs/test-0
total 8
-rw-r--r-- 1 user wheel 10485760 Oct 14 10:33 00000000000000000000.index
-rw-r--r-- 1 user wheel 0 Oct 14 10:33 00000000000000000000.log
-rw-r--r-- 1 user wheel 10485756 Oct 14 10:33 00000000000000000000.timeindex
-rw-r--r-- 1 user wheel 8 Oct 14 10:33 leader-epoch-checkpoint
(base) user@userdeMacBook-Air-10 bin %

zookeeper 生成了什么?

记录了配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
localhost:2181	$	get /config/topics/test
{"version":1,"config":{}}
cZxid = 0x88d
ctime = Mon Oct 14 10:33:38 CST 2024
mZxid = 0x88d
mtime = Mon Oct 14 10:33:38 CST 2024
pZxid = 0x88d
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 25
numChildren = 0

kafka AdminClient –> broker

kafka 执行命令也是遵从 CS 架构, 有个 AdminClient 现在本地做校验,初步验证没问题后,发送给 broker 处理,本文主要介绍 broker 端口的处理,AdminClient 简单带过

AdminClient

AdminClient 源码目录在 ~/kafka-prj/core/src/main/scala/kafka/admin/TopicCommand.scala 我们调用 kafka-topics.sh 命令实现的功能逻辑都在这个文件,这里面有个 TopicCommand 类

在 2.6 的版本还可以用 zookeeper ,命令可以指定用 zookeeper 还是 brokerserve(必须选一个),两者逻辑不一样,这里主要介绍 AdminClientTopicService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def main(args: Array[String]): Unit = {

val opts = new TopicCommandOptions(args)
opts.checkArgs()

//根据 命令配置项来确定 用Zookeeper 还是brokerserver ,这里主要
val topicService = if (opts.zkConnect.isDefined)
ZookeeperTopicService(opts.zkConnect)
else
AdminClientTopicService(opts.commandConfig, opts.bootstrapServer)

var exitCode = 0
try {
if (opts.hasCreateOption)
//调用的是 AdminClientTopicService类的createTopic 方法
topicService.createTopic(opts)
}
....
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
override def createTopic(topic: CommandTopicPartition): Unit = {
//验证是否存在
if (topic.replicationFactor.exists(rf => rf > Short.MaxValue || rf < 1))
throw new IllegalArgumentException(s"The replication factor must be between 1 and ${Short.MaxValue} inclusive")
//验证分区数是否符合规则,没有上限,只有下限
if (topic.partitions.exists(partitions => partitions < 1))
throw new IllegalArgumentException(s"The partitions must be greater than 0")

try {
//hasReplicaAssignment 指 是否有用 配置文件来指定 分区所在节点
val newTopic = if (topic.hasReplicaAssignment)
new NewTopic(topic.name, asJavaReplicaReassignment(topic.replicaAssignment.get))
else {
//默认创建流程
new NewTopic(
topic.name,
topic.partitions.asJava,
topic.replicationFactor.map(_.toShort).map(Short.box).asJava)
}

val configsMap = topic.configsToAdd.stringPropertyNames()
.asScala
.map(name => name -> topic.configsToAdd.getProperty(name))
.toMap.asJava

newTopic.configs(configsMap)
//发送到broker 端,同步
val createResult = adminClient.createTopics(Collections.singleton(newTopic))
createResult.all().get()
//下面也是我们执行后的命令回显输出
println(s"Created topic ----- ${topic.name}.")
} catch {
//.....
}

发送过程省略,又是一个大话题,最后发送到 controller 节点

Broker

最终由 handleCreateTopicsRequest 处理,这里在做进一步的校验,如 是不是 control 节点,鉴权,是否有权限创建,是否已存在等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def handleCreateTopicsRequest(request: RequestChannel.Request): Unit = {
//...

val createTopicsRequest = request.body[CreateTopicsRequest]
val results = new CreatableTopicResultCollection(createTopicsRequest.data.topics.size)

if (!controller.isActive) {
//.不是 controller 节点
} else {
createTopicsRequest.data.topics.forEach { topic =>
results.add(new CreatableTopicResult().setName(topic.name))
}
....
}
///回调函数
def handleCreateTopicsResults(errors: Map[String, ApiError]): Unit = {
errors.foreach { case (topicName, error) =>
val result = results.find(topicName)
result.setErrorCode(error.error.code)
.setErrorMessage(error.message)
// Reset any configs in the response if Create failed
if (error != ApiError.NONE) {
result.setConfigs(List.empty.asJava)
.setNumPartitions(-1)
.setReplicationFactor(-1)
.setTopicConfigErrorCode(0.toShort)
}
}
sendResponseCallback(results)
}
//传入回调函数 createTopics也是创建的关键逻辑
adminManager.createTopics(createTopicsRequest.data.timeoutMs,
createTopicsRequest.data.validateOnly,
toCreate,
authorizedForDescribeConfigs,
handleCreateTopicsResults)
}
}

createTopics

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def createTopics(timeout: Int,
validateOnly: Boolean,
toCreate: Map[String, CreatableTopic],
includeConfigsAndMetatadata: Map[String, CreatableTopicResult],
responseCallback: Map[String, ApiError] => Unit): Unit = {

// 1. map over topics creating assignment and calling zookeeper
//从内存中获取 brokers信息
val brokers = metadataCache.getAliveBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) }
val metadata = toCreate.values.map(topic =>
try {
//判断缓存中是否存在
if (metadataCache.contains(topic.name))
throw new TopicExistsException(s"Topic '${topic.name}' already exists.")
//收集配置
val configs = new Properties()
topic.configs.forEach { entry =>
configs.setProperty(entry.name, entry.value)
}
LogConfig.validate(configs)
// 分区数和副本数校验
if ((topic.numPartitions != NO_NUM_PARTITIONS || topic.replicationFactor != NO_REPLICATION_FACTOR)
&& !topic.assignments().isEmpty) {
throw new InvalidRequestException("Both numPartitions or replicationFactor and replicasAssignments were set. " +
"Both cannot be used at the same time.")
}
//....
//这里会指定分配好副本在那个broker 上
val assignments = if (topic.assignments().isEmpty) {
AdminUtils.assignReplicasToBrokers(
brokers, resolvedNumPartitions, resolvedReplicationFactor)
} else {
val assignments = new mutable.HashMap[Int, Seq[Int]]
// Note: we don't check that replicaAssignment contains unknown brokers - unlike in add-partitions case,
// this follows the existing logic in TopicCommand
topic.assignments.forEach { assignment =>
assignments(assignment.partitionIndex) = assignment.brokerIds.asScala.map(a => a: Int)
}
assignments
}
// 使用方式是自定义实现 org.apache.kafka.server.policy.CreateTopicPolicy
// 可以自定义 一个类来实现创建规则 create.topic.policy.class.name=自定义类
createTopicPolicy match {
case Some(policy) =>
adminZkClient.validateTopicCreate(topic.name, assignments, configs)

//.... 省略
// 把topic相关数据写入到zk中 拎出来分析下 createTopicWithAssignment
if (!validateOnly)
adminZkClient.createTopicWithAssignment(topic.name, configs, assignments)
case None =>
if (validateOnly)
adminZkClient.validateTopicCreate(topic.name, assignments, configs)
else
adminZkClient.createTopicWithAssignment(topic.name, configs, assignments)
}

// For responses with DescribeConfigs permission, populate metadata and configs
includeConfigsAndMetatadata.get(topic.name).foreach { result =>
val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), configs)
val createEntry = createTopicConfigEntry(logConfig, configs, includeSynonyms = false, includeDocumentation = false)(_, _)
val topicConfigs = logConfig.values.asScala.map { case (k, v) =>
val entry = createEntry(k, v)
val source = ConfigSource.values.indices.map(_.toByte)
.find(i => ConfigSource.forId(i.toByte) == entry.source)
.getOrElse(0.toByte)
new CreatableTopicConfigs()
.setName(k)
.setValue(entry.value)
.setIsSensitive(entry.isSensitive)
.setReadOnly(entry.isReadOnly)
.setConfigSource(source)
}.toList.asJava
result.setConfigs(topicConfigs)
result.setNumPartitions(assignments.size)
result.setReplicationFactor(assignments(0).size.toShort)
}
CreatePartitionsMetadata(topic.name, assignments.keySet, ApiError.NONE)
} catch {
//一些异常
error(s"Error processing create topic request $topic", e)
CreatePartitionsMetadata(topic.name, Set.empty, ApiError.fromThrowable(e))
}).toBuffer

// 省略
}

createTopicWithAssignment

这里会将 topic 的配置写入 zk 中,也就是我们看到的 /config/topics/目录下的内容,以及 /brokers/topics/[topic-name]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def createTopicWithAssignment(topic: String,
config: Properties,
partitionReplicaAssignment: Map[Int, Seq[Int]]): Unit = {
// 校验 topic 配置
validateTopicCreate(topic, partitionReplicaAssignment, config)
// write out the config if there is any, this isn't transactional with the partition assignments
//topic 的配置写入 zk 中
zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)

// create the partition assignment
//副本分配策略,分区信息写入到 /brokers/topics/topicName
writeTopicPartitionAssignment(topic, partitionReplicaAssignment.map { case (k, v) => k -> ReplicaAssignment(v) },
isUpdate = false)
}

此时,已经更新了 zk 数据, Controller 节点上有监听 zk 数据变化的线程(由 TopicChangeListener 监听), 如果 /brokers/topics 目录下发生了变化,会有相对应的逻辑处理,这里是由 processTopicChange 函数处理

processTopicChange

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private def processTopicChange(): Unit = {
if (!isActive) return
//从 zk 中获取全部 topic
val topics = zkClient.getAllTopicsInCluster(true)
//比对 topic 数量来确定 topic 是否新增
val newTopics = topics -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics.diff(topics)
controllerContext.setAllTopics(topics)

registerPartitionModificationsHandlers(newTopics.toSeq)
//获取新增 topic 的分区信息
val addedPartitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(newTopics)

deletedTopics.foreach(controllerContext.removeTopic)
addedPartitionReplicaAssignment.foreach {
case (topicAndPartition, newReplicaAssignment) => controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment)
}
info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
s"[$addedPartitionReplicaAssignment]")
//如果从 zk 读取到的数据不为空
if (addedPartitionReplicaAssignment.nonEmpty)
//进入状态机流程,创建分区目录,onNewPartitionCreation 也是创建 topic 最核心的逻辑
onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
}

onNewPartitionCreation

进入状态机流程, 这里 Replica状态机 和 Partition状态机 , 这里先留坑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private def onNewPartitionCreation(newPartitions: Set[TopicPartition]): Unit = {
info(s"New partition creation callback for ${newPartitions.mkString(",")}")
//NewPartition : 这种状态下该 Replica 只能作为 follower,它可以是 Replica 删除后的一个临时状态,它有效的前置状态是 NonExistentReplica
partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition)
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica)

partitionStateMachine.handleStateChanges(
newPartitions.toSeq,
OnlinePartition,
Some(OfflinePartitionLeaderElectionStrategy(false))
)
//Replica 转换成 OnlineReplica 状态
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, OnlineReplica)
}

… 这中间过程先留坑

最后, control 节点会先发送 leaderAndIsrRequest 请求给 leader ,本地创建副本,也就是 我们分区看到的文件夹