Kafka-入门

1. 简介

kafka是一个高吞吐量的分布式发布订阅系统,可以实时的处理大量数据

2. 安装

2.1 docker安装kafka

  1. 安装zookeeper
1
docker run --name zookeeper -p 2181:2181 -d zookeeper
  1. 安装kafka(10.66.106.86为本机ip)
1
2
3
4
docker run --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=10.66.106.86:2181 -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.66.106.86:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -d bitnami/kafka:3.0.0

#如果在win版本docker中挂载kafka数据目录加上以下启动参数
-v C:\dockerData\kafka\data:/bitnami/kafka/data

具体的启动配置可以查看官方文档:https://hub.docker.com/r/bitnami/kafka,Configuration部分

  1. 进入kafka容器
1
2
docker exec -it kafka bash
cd opt/bitnami/kafka/bin/
  1. 创建toptic
1
./kafka-topics.sh --create --bootstrap-server 10.66.106.86:9092 --replication-factor 1 --partitions 2 --topic aacopy-topic
  1. 查看toptic
1
./kafka-topics.sh --list --bootstrap-server 10.66.106.86:9092
  1. 创建消息生产者
1
./kafka-console-producer.sh --broker-list 10.66.106.86:9092 --topic aacopy-topic
  1. 新打开一个命令窗口,创建消息消费者
1
./kafka-console-consumer.sh --bootstrap-server 10.66.106.86:9092 --topic aacopy-topic --from-beginning
  1. 在消息生产者中输入消息,可以在消费者窗口查看到输入的消息

可以用命令加–help查看相关参数 eg: ./kafka-console-consumer.sh –help

2.2 Kakfa-UI

github:https://github.com/provectus/kafka-ui

  • docker直接启动

    1
    2
    3
    4
    5
    6
    7
    docker run -p 9090:8080 --name kafka-ui \
    -e DYNAMIC_CONFIG_ENABLED=true \
    -e KAFKA_CLUSTERS_0_NAME=dev \
    -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=192.168.80.128:9092,192.168.80.128:9093,192.168.80.128:9094 \
    -e KAFKA_CLUSTERS_1_NAME=release \
    -e KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS=192.168.80.129:9092,192.168.80.129:9093,192.168.80.129:9094 \
    -d provectuslabs/kafka-ui:v0.7.1
  • docker-compose

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    version: '3.8'

    services:
    kafka-ui:
    container_name: kafka-ui
    image: provectuslabs/kafka-ui:v0.7.1
    ports:
    - 9090:8080
    environment:
    DYNAMIC_CONFIG_ENABLED: true
    AUTH_TYPE: "LOGIN_FORM"
    SPRING_SECURITY_USER_NAME: admin
    SPRING_SECURITY_USER_PASSWORD: aacopy.cn
    volumes:
    - /home/dockerdata/kafka-ui/dynamic_config.yaml:/etc/kafkaui/dynamic_config.yaml
  • 提前创建文件/home/dockerdata/kafka-ui/dynamic_config.yaml,并添加权限chmod 777 dynamic_config.yaml

  • 访问地址:部署的ip:9090

  • 添加集群

3. kafka相关概念

  • Broker

    kafka服务端程序,一个mq节点就是一个broker

  • Producer

    消息生产者,生成消息发送到topic中

  • Consumer

    消息消费者

  • ConsumerGroup

    消费者组,一条广播消息,发送给多个消费者组,在同一个消费者组中只能有一个消费者可以消费

  • Topic

    主题,即消息的类别

  • Partition

    topic物理上的分组,数据存储的基本单元,每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中,一个topic中有至少一个partition,多个partition被分配到集群的多个server上,partition的数量要大于等于同一个消费者组中消息者数量,如果一个组中的消费者数量大于partition数量,有消费者会获取不到数据,所有在处理消息堆积的问题时,单纯加消费者数量是没有用的。

  • Replication

    副本,一个partition有多个副本,每个副本的数据是一样的,用于宕机后的切换,保证服务可用,默认是没有副本,副本数不能大于集群中的节点数

  • ReplicationLeader

    只存在一个leader副本,用于和生产者、消费者进行交互

  • ReplicationFollower

    只做备份,从leader同步数据

  • ReplicationManager

    负责副本状态切换

  • Segment

    partition物理上由多个segment组成

  • offset

    每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息

4. Kafka-SpringBoot

官方网站:https://spring.io/projects/spring-kafka

官方文档:https://docs.spring.io/spring-kafka/docs/current/reference/html/

4.1 最简集成

  1. 环境:spring boot 2.6.1,kafka 3.0.0
  2. 添加maven依赖
1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
  1. 使用默认配置,最简单执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@SpringBootApplication
public class KafkaLearnApplication {

public static void main(String[] args) {
SpringApplication.run(KafkaLearnApplication.class, args);
}

@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}

@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
for (int i = 0; i < 100; i++) {
System.out.println("生产者发送消息:test"+i);
template.send("topic1", "test"+i);
}
};
}

@KafkaListener(id = "myId", topics = "topic1")
public void listen(String in) {
System.out.println("消费者消费消息:"+in);
}
}

4.2 spring web

消息生产者KafkaProducer

1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class KafkaProducer {

@Autowired
private KafkaTemplate<String, String> template;
@Value("${kafka_topic}")
private String topic;

public void sendMsg(String msg) {
template.send(topic, msg);
}
}

消息消费者KafkaConsumer

1
2
3
4
5
6
7
8
@Component
public class KafkaConsumer {

@KafkaListener(topics = "${kafka_topic}", groupId = "${kafka_groupId}")
public void listen(String msg) {
System.out.println("消费者消费消息:"+msg);
}
}

controller

1
2
3
4
5
6
7
8
9
10
11
12
@RestController
public class TestController {

@Autowired
private KafkaProducer kafkaProducer;

@RequestMapping("/send")
public String send(String msg) {
kafkaProducer.sendMsg(msg);
return msg;
}
}

配置文件application.properties

1
2
3
4
server.servlet.context-path=/kafka-learn

kafka_topic=topic1
kafka_groupId=group1

浏览器访问

http://127.0.0.1:8080/kafka-learn/send?msg=aacopy

查看控制台打印

5. 命令

5.1 Topic相关

5.1.1 查看topic

1
./kafka-topics.sh --list --bootstrap-server 10.66.106.86:9092

5.1.2 创建topic

1
./kafka-topics.sh --create --bootstrap-server 10.66.106.86:9092 --replication-factor 1 --partitions 2 --topic aacopy-topic

5.1.3 删除topic

1
2
3
./kafka-topics.sh --delete --bootstrap-server 10.66.106.86:9092 --topic aacopy-topic
有问题,会报错
ERROR Shutdown broker because all log dirs in /bitnami/kafka/data have failed (kafka.log.LogManager)

5.1.4 插入消息

1
./kafka-producer-perf-test.sh --topic aacopy-log-test --num-records 10000 --record-size 100 --throughput -1 --producer-props bootstrap.servers=10.66.106.86:9092

5.2 segment

5.2.1 查看segment index Dump

1
./kafka-run-class.sh kafka.tools.DumpLogSegments --files /bitnami/kafka/data/aacopy-log-test-0/00000000000000000000.index --print-data-log

5.2.2 查看segment log Dump

1
./kafka-run-class.sh kafka.tools.DumpLogSegments --files /bitnami/kafka/data/aacopy-log-test-0/00000000000000000000.log --print-data-log

6. 消息模型

Kafka消息模型分为:点对点和发布订阅

6.1 点对点

所有的消费者在同一个组中,一条消息只会被一个消费者消费。

启动一个消息生产者

1
./kafka-console-producer.sh --broker-list 10.66.106.86:9092 --topic aacopy-topic

在两个命令窗口,分别执行命令,创建在同一个组(group参数相同)中的两个消费者

1
./kafka-console-consumer.sh --bootstrap-server 10.66.106.86:9092 --topic aacopy-topic --group group1

在生产者中输入消息,可以看到消息会在两个消费者中轮询消费,且一条消息只会被一个消费者消费

查看组信息

1
2
./kafka-consumer-groups.sh --bootstrap-server 10.66.106.86:9092 --list
./kafka-consumer-groups.sh --bootstrap-server 10.66.106.86:9092 --describe --group group1

注意:

docker中退出生产消费者的命令窗口,ctrl+C,不能直接用ctrl+P+Q或者ctrl+Z直接退出,不然消费者并没有结束进程,还在消费数据

6.2 发布订阅

消费者在不同的组,所有组都可以消费发布的消息

启动一个消息生产者

1
./kafka-console-producer.sh --broker-list 10.66.106.86:9092 --topic aacopy-topic

在两个命令窗口,创建在不同组(group参数相同)的两个消费者

命令窗口一执行:

1
./kafka-console-consumer.sh --bootstrap-server 10.66.106.86:9092 --topic aacopy-topic --group group1

命令窗口二执行:

1
./kafka-console-consumer.sh --bootstrap-server 10.66.106.86:9092 --topic aacopy-topic --group group2

在生产者中输入消息,可以看到消息会在两个消费者中同时被消费

6.3 两种方式结合

创建四个消费者,

两个执行命令

1
./kafka-console-consumer.sh --bootstrap-server 10.66.106.86:9092 --topic aacopy-topic --group group1

另外两个执行命令

1
./kafka-console-consumer.sh --bootstrap-server 10.66.106.86:9092 --topic aacopy-topic --group group2

查看所有消费者组消息

1
./kafka-consumer-groups.sh --bootstrap-server 10.66.106.86:9092 --describe --all-groups

创建一个消息生产者

1
./kafka-console-producer.sh --broker-list 10.66.106.86:9092 --topic aacopy-topic

发送生产消息,消息会在两个消费者组中同时被消费,且同一个组中只能被消费一次

7. 生产者

生产者如何将消息发送给kafka

  • batch缓冲区

    • 生产者发送消息到服务端不是一条一条发的,而是经过一个内存缓冲区,当缓冲区达到一定条件后,再一次性将消息数据发送到topic
    • 缓冲区大小默认为16KB,通过batch.size配置,单位:字节,当消息数据超过阀值后,就会执行提交
    • 如果消息大小一直没有满设置的缓冲区大小阀值,则会根据超过某个时间后进行发送,通过配置linger.ms实现,默认为0,表示即使缓冲区没满,消息也会立刻发送。
    • batch.size和linger.ms,两个配置,满足其一,消息就会被发送
  • 消息发送到partition分区的策略

    • 如果指定了partition的id,就会被发送到指定的partition中
    • 如果指定了key,ProducerRecord会根据key的hash值,发送到对应的partition中,拥有相同key的消息会被写到同一个partition,实现顺序消息
    • 如果id和key都没有指定,则会按照粘性(sticky)分区策略的方式依次发送到每个partition中
    • 如果同时指定id和key,则以id为准
  • partition分区策略的改变

    • 2.4版本以前,默认round-robin(轮询)方式,2.3.1版本源码如下:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      /**
      * The default partitioning strategy:
      * <ul>
      * <li>If a partition is specified in the record, use it
      * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
      * <li>If no partition or key is present choose a partition in a round-robin fashion
      */
      public class DefaultPartitioner implements Partitioner {

      private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

      public void configure(Map<String, ?> configs) {}

      /**
      * Compute the partition for the given record.
      *
      * @param topic The topic name
      * @param key The key to partition on (or null if no key)
      * @param keyBytes serialized key to partition on (or null if no key)
      * @param value The value to partition on or null
      * @param valueBytes serialized value to partition on or null
      * @param cluster The current cluster metadata
      */
      public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
      List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
      int numPartitions = partitions.size();
      if (keyBytes == null) {
      int nextValue = nextValue(topic);
      List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
      if (availablePartitions.size() > 0) {
      int part = Utils.toPositive(nextValue) % availablePartitions.size();
      return availablePartitions.get(part).partition();
      } else {
      // no partitions are available, give a non-available partition
      return Utils.toPositive(nextValue) % numPartitions;
      }
      } else {
      // hash the keyBytes to choose a partition
      return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
      }
      }

      private int nextValue(String topic) {
      AtomicInteger counter = topicCounterMap.get(topic);
      if (null == counter) {
      counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
      AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
      if (currentCounter != null) {
      counter = currentCounter;
      }
      }
      return counter.getAndIncrement();
      }

      public void close() {}

      }
    • 2.4版本以后,默认为:sticky (粘性)

      参考文档:https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/

      If we send several records to the same partition at around the same time, they can be sent as a batch

      如果我们在同一时间将多条记录发送到同一个分区,它们可以作为批处理发送

      Generally, smaller batches lead to more requests and queuing, resulting in higher latency.

      一般来说,更小的批量会导致更多的请求和排队,从而导致更高的延迟。

      A batch is completed either when it reaches a certain size (batch.size) or after a period of time (linger.ms) is up.

      一个批处理要么在达到某个大小(batch.size)时完成,要么在一段时间(linger.ms)结束后完成。

      The default for batch. size is 16,384 bytes, and the default of linger. ms is 0 milliseconds.

      默认值,batch.size为16KB,linger. ms为0ms

      At first glance, it might seem like setting linger.ms to 0 would only lead to the production of single-record batches. However, this is usually not the case. Even when linger.ms is 0, the producer will group records into batches when they are produced to the same partition around the same time. This is because the system needs a bit of time to handle each request, and batches form when the system cannot attend to them all right away.

      乍一看,似乎设置linger.ms为 0 只会导致生成单记录批次。然而,通常情况并非如此。即使linger.ms是 0,生产者也会将记录在同一时间生产到同一分区时分批进行。这是因为系统需要一点时间来处理每个请求,并且当系统无法立即处理它们时会批量形成。

      Due to the potential for increased latency with small batches, the original strategy for partitioning records with null keys can be inefficient. This changes with Apache Kafka version 2.4, which introduces sticky partitioning, a new strategy for assigning records to partitions with proven lower latency.

      由于小批量可能会增加延迟,因此使用空键对记录进行分区的原始策略可能效率低下。这在Apache Kafka 2.4 版中发生了变化,它引入了粘性分区,这是一种将记录分配给具有低延迟的分区的新策略。

      The sticky partitioner addresses the problem of spreading out records without keys into smaller batches by picking a single partition to send all non-keyed records. Once the batch at that partition is filled or otherwise completed, the sticky partitioner randomly chooses and “sticks” to a new partition. That way, over a larger period of time, records are about evenly distributed among all the partitions while getting the added benefit of larger batch sizes.

      粘性分区器通过选择单个分区来发送所有非键记录,解决了将没有键的记录分散成较小批次的问题。一旦该分区的批次被填满或以其他方式完成,粘性分区程序会随机选择并“粘”到一个新分区。这样,在更长的时间内,记录大致均匀地分布在所有分区中,同时获得更大批量的额外好处。

      The main goal of the sticky partitioner is to increase the number of records in each batch in order to decrease the total number of batches and eliminate excess queuing. When there are fewer batches with more records in each batch, the cost per record is lower and the same number of records can be sent more quickly using the sticky partitioning strategy. The data shows that this strategy does decrease latency in cases where null keys are used, and the effect becomes more pronounced when the number of partitions increases. In addition, CPU usage is often decreased when using the sticky partitioning strategy. By sticking to a partition and sending fewer but bigger batches, the producer sees great performance improvements.

      粘性分区器的主要目标是增加每个批次中的记录数,以减少批次总数并消除多余的排队。当每个批次中有更多记录的批次较少时,每条记录的成本较低,并且使用粘性分区策略可以更快地发送相同数量的记录。数据显示,在使用空键的情况下,这种策略确实减少了延迟,并且当分区数量增加时效果会更加明显。此外,在使用粘性分区策略时,CPU 使用率通常会降低。通过坚持分区并发送更少但更大的批次,生产者看到了巨大的性能改进。

3.0.0版本的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
* <li>If no partition or key is present choose the sticky partition that changes when the batch is full.
*
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {

private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

public void configure(Map<String, ?> configs) {}

/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
}

/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param numPartitions The number of partitions of the given {@code topic}
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
int numPartitions) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

public void close() {}

/**
* If a batch completed for the current sticky partition, change the sticky partition.
* Alternately, if no sticky partition has been determined, set one.
*/
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* An internal class that implements a cache used for sticky partitioning behavior. The cache tracks the current sticky
* partition for any given topic. This class should not be used externally.
*/
public class StickyPartitionCache {
private final ConcurrentMap<String, Integer> indexCache;
public StickyPartitionCache() {
this.indexCache = new ConcurrentHashMap<>();
}

public int partition(String topic, Cluster cluster) {
Integer part = indexCache.get(topic);
if (part == null) {
return nextPartition(topic, cluster, -1);
}
return part;
}

public int nextPartition(String topic, Cluster cluster, int prevPartition) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
// Check that the current sticky partition for the topic is either not set or that the partition that
// triggered the new batch matches the sticky partition that needs to be changed.
// oldPart == null表示新建的topic被第一次调用,oldPart == prevPartition在新的batch被创建时为true
if (oldPart == null || oldPart == prevPartition) {
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() < 1) {
Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
newPart = availablePartitions.get(0).partition();
} else {
while (newPart == null || newPart.equals(oldPart)) {
int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % availablePartitions.size()).partition();
}
}
// Only change the sticky partition if it is null or prevPartition matches the current sticky partition.
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
indexCache.replace(topic, prevPartition, newPart);
}
return indexCache.get(topic);
}
return indexCache.get(topic);
}

}

代码验证:

  1. 创建一个topic:aacopy,5个分区,1个副本

    1
    2
    3
    4
    5
    6
    public void createTopic() throws ExecutionException, InterruptedException {
    //(topic名称,partition分区数量,副本数量)
    NewTopic newTopic = new NewTopic("aacopy", 5, (short) 1);
    CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
    topics.all().get();
    }
  2. 没有指定id和key的情况

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @Test
    public void sandWithNothing() {
    for(int i=0; i<100; i++) {
    ListenableFuture<SendResult<String, String>> sendMsg = kafkaTemplate.send("aacopy", "aacopy-msg-" + i);
    //增加回调函数,打印返回值
    sendMsg.addCallback(success -> {
    RecordMetadata rm = success.getRecordMetadata();
    System.out.println("消息发送成功:topic->" + rm.topic() + ", partition->" + rm.partition() + ", offset->" + rm.offset());
    }, failure -> System.out.println("消息发送失败:" + failure.getMessage()));
    }
    }

    控制台打印日志截取:

    ……

    消息发送成功:topic->aacopy, partition->2, offset->403
    消息发送成功:topic->aacopy, partition->2, offset->404
    消息发送成功:topic->aacopy, partition->2, offset->405
    消息发送成功:topic->aacopy, partition->2, offset->406
    消息发送成功:topic->aacopy, partition->2, offset->407
    消息发送成功:topic->aacopy, partition->1, offset->319
    消息发送成功:topic->aacopy, partition->1, offset->320
    消息发送成功:topic->aacopy, partition->1, offset->321
    消息发送成功:topic->aacopy, partition->1, offset->322

    ……

  3. 指定id

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @Test
    public void sendWithId() {
    for(int i=0; i<100; i++) {
    ListenableFuture<SendResult<String, String>> sendMsg = kafkaTemplate.send("aacopy", 4, null, "aacopy-msg-" + i);
    sendMsg.addCallback(success -> {
    RecordMetadata rm = success.getRecordMetadata();
    System.out.println("消息发送成功:topic->" + rm.topic() + ", partition->" + rm.partition() + ", offset->" + rm.offset());
    }, failure -> System.out.println("消息发送失败:" + failure.getMessage()));
    }
    }

    控制台打印日志截取:

    ……

    消息发送成功:topic->aacopy, partition->4, offset->93
    消息发送成功:topic->aacopy, partition->4, offset->94
    消息发送成功:topic->aacopy, partition->4, offset->95
    消息发送成功:topic->aacopy, partition->4, offset->96
    消息发送成功:topic->aacopy, partition->4, offset->97
    消息发送成功:topic->aacopy, partition->4, offset->98
    消息发送成功:topic->aacopy, partition->4, offset->99

    所有消息都推送到id为4的partition中

  4. 指定key

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @Test
    public void sendWithKey() {
    for(int i=0; i<100; i++) {
    ListenableFuture<SendResult<String, String>> sendMsg = kafkaTemplate.send("aacopy", "HelloWorld", "aacopy-msg-" + i);
    sendMsg.addCallback(success -> {
    RecordMetadata rm = success.getRecordMetadata();
    System.out.println("消息发送成功:topic->" + rm.topic() + ", partition->" + rm.partition() + ", offset->" + rm.offset());
    }, failure -> System.out.println("消息发送失败:" + failure.getMessage()));
    }
    }

    控制台打印日志截取:

    ……

    消息发送成功:topic->aacopy, partition->0, offset->102
    消息发送成功:topic->aacopy, partition->0, offset->103
    消息发送成功:topic->aacopy, partition->0, offset->104
    消息发送成功:topic->aacopy, partition->0, offset->105
    消息发送成功:topic->aacopy, partition->0, offset->106
    消息发送成功:topic->aacopy, partition->0, offset->107

    ……

  5. 同时指定id和key

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @Test
    public void sendWithIdAndKey() {
    for(int i=0; i<100; i++) {
    ListenableFuture<SendResult<String, String>> sendMsg = kafkaTemplate.send("aacopy", 4, "HelloWorld", "aacopy-msg-" + i);
    sendMsg.addCallback(success -> {
    RecordMetadata rm = success.getRecordMetadata();
    System.out.println("消息发送成功:topic->" + rm.topic() + ", partition->" + rm.partition() + ", offset->" + rm.offset());
    }, failure -> System.out.println("消息发送失败:" + failure.getMessage()));
    }
    }

    控制台打印日志截取:

    ……

    消息发送成功:topic->aacopy, partition->4, offset->333
    消息发送成功:topic->aacopy, partition->4, offset->334
    消息发送成功:topic->aacopy, partition->4, offset->335
    消息发送成功:topic->aacopy, partition->4, offset->336
    消息发送成功:topic->aacopy, partition->4, offset->337

    ……

​ 打印验证,id和key同时存在,以id为准

  • 自定义分区规则

    自定义分区规则需要实现Partitioner接口,或者继承实现了Partitioner接口的类,这里继承DefaultPartitioner,可以在不需要自定义的消息时走默认的策略

    1. 编写自定义分区规则类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public class CustomizedPartitioner extends DefaultPartitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    if(key != null && key instanceof String) {
    String keyStr = (String) key;
    if(keyStr.startsWith("aacopy_")) {
    System.out.println("自定义分区策略...... key ->" + keyStr);
    return 0;
    }
    }
    return super.partition(topic, key, keyBytes, value, valueBytes, cluster);
    }
    }
    1. 添加配置文件,
    1
    2
    #定义分区策略
    spring.kafka.producer.properties.partitioner.class=cn.aacopy.learn.kafka.CustomizedPartitioner
    1. 编写测试代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @Test
    public void sendWithCustomizedPartitioner() {
    for(int i=0; i<100; i++) {
    ListenableFuture<SendResult<String, String>> sendMsg = kafkaTemplate.send("aacopy", "aacopy_" + i, "aacopy-msg-" + i);
    sendMsg.addCallback(success -> {
    RecordMetadata rm = success.getRecordMetadata();
    System.out.println("消息发送成功:topic->" + rm.topic() + ", partition->" + rm.partition() + ", offset->" + rm.offset());
    }, failure -> System.out.println("消息发送失败:" + failure.getMessage()));
    }
    }
    1. 控制台打印日志截取:

    ……

    自定义分区策略…… key ->aacopy_96
    自定义分区策略…… key ->aacopy_97
    自定义分区策略…… key ->aacopy_98
    自定义分区策略…… key ->aacopy_98
    自定义分区策略…… key ->aacopy_99

    ……

    消息发送成功:topic->aacopy, partition->0, offset->729
    消息发送成功:topic->aacopy, partition->0, offset->730
    消息发送成功:topic->aacopy, partition->0, offset->731
    消息发送成功:topic->aacopy, partition->0, offset->732
    消息发送成功:topic->aacopy, partition->0, offset->733

    ……

8. 消费者

8.1 获取消息策略

拉取(pull)的方式,消费者从partition中主动拉取消息。

优点:

  • 消费者可以根据性能处理消息,如果是push模式,可能会导致服务并发不够处理不过来,MQ的削峰也就没有了意义
  • 如果服务端没有消息,客户端会定时发送请求,根据超时时间,阻塞一段时间后再返回

8.2 分区分配策略

通过partition.assignment.strategy配置,ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG获取

默认策略:RangeAssignor(按照主题进行范围分配)

1
2
3
4
5
6
.define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
Type.LIST,
Arrays.asList(RangeAssignor.class, CooperativeStickyAssignor.class),//默认值
new ConfigDef.NonNullValidator(),
Importance.MEDIUM,
PARTITION_ASSIGNMENT_STRATEGY_DOC)

kafka的分配策略

  • RangeAssignor

    按照主题进行范围分配

  • RoundRobinAssignor

    轮询分配,以循环方式将分区分配给消费者

  • StickyAssignor

    保证分配是最大限度的平衡,同时保留尽可能多的现有分区分配。

  • CooperativeStickyAssignor

    遵循StickyAssignor相同的逻辑,但允许合作再平衡。

默认的assignor是[RangeAssignor, CooperativeStickyAssignor],默认使用RangeAssignor,但是允许升级到CooperativeStickyAssignor,只需从列表中移除RangeAssignor。

也可以自定义策略,通过实现org.apache.kafka.clients.consumer.ConsumerPartitionAssignor

8.2.1 RoundRobinAssignor

实现方式:获取一个消费者组中订阅的所有的Topic中的所有partition,和消费者组中所有的消费者列表,外层遍历partition列表,内存遍历消费者列表,将每个partition依次分配给消费者。

示例1:

示例2:

官方示例:

When subscriptions differ across consumer instances, the assignment process still considers each consumer instance in round robin fashion but skips over an instance if it is not subscribed to the topic. Unlike the case when subscriptions are identical, this can result in imbalanced assignments. For example, we have three consumers C0, C1, C2, and three topics t0, t1, t2, with 1, 2, and 3 partitions, respectively. Therefore, the partitions are t0p0, t1p0, t1p1, t2p0, t2p1, t2p2. C0 is subscribed to t0; C1 is subscribed to t0, t1; and C2 is subscribed to t0, t1, t2.
That assignment will be:
C0: [t0p0]
C1: [t1p0]
C2: [t1p1, t2p0, t2p1, t2p2]

我们有三个消费者C0、C1、C2和三个主题t0、t1、t2,分别有1、2和3个分区。因此,分区为t0p0, t1p0, t1p1, t2p0, t2p1, t2p2。C0订阅到t0;C1订阅t0, t1;C2订阅了t0 t1 t2。

如果同一个消费者组中不同的消费者订阅了不同的主题,就会不均匀分配,上面的T1P1完全可以分配给消费者2

源码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class RoundRobinAssignor extends AbstractPartitionAssignor {

@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
Map<String, List<TopicPartition>> assignment = new HashMap<>();
//获取消费者组中所有的消费者
List<MemberInfo> memberInfoList = new ArrayList<>();
for (Map.Entry<String, Subscription> memberSubscription : subscriptions.entrySet()) {
assignment.put(memberSubscription.getKey(), new ArrayList<>());
memberInfoList.add(new MemberInfo(memberSubscription.getKey(),
memberSubscription.getValue().groupInstanceId()));
}

CircularIterator<MemberInfo> assigner = new CircularIterator<>(Utils.sorted(memberInfoList));

for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
final String topic = partition.topic();
//判断当前遍历的partition所属的主题在当前消费者是否订阅,如果没有订阅继续获取下一个消费者
while (!subscriptions.get(assigner.peek().memberId).topics().contains(topic))
assigner.next();
assignment.get(assigner.next().memberId).add(partition);
}
return assignment;
}

//获取消费者组内订阅的topic中所有的patition集合
private List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
SortedSet<String> topics = new TreeSet<>();
for (Subscription subscription : subscriptions.values())
topics.addAll(subscription.topics());

List<TopicPartition> allPartitions = new ArrayList<>();
for (String topic : topics) {
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic != null)
allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic));
}
return allPartitions;
}

@Override
public String name() {
return "roundrobin";
}

}

8.2.2 RangeAssignor

实现方式:获取消费者组中的所有主题,遍历主题,获取每个主题下面的所有消费者的数量和分区数量,用分区数量除以消费者数量,获取每个消费者应该分配到的分区数,如果有余数,按照顺序,排在前面的消费者每个多分配一个分区。

示例1:

官方示例2:

For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
The assignment will be:
C0: [t0p0, t0p1, t1p0, t1p1]
C1: [t0p2, t1p2]

假设有两个消费者C0和C1,两个主题t0和t1,每个主题有3个分区,导致分区t0p0、t0p1、t0p2、t1p0、t1p1和t1p2。

如果有很多topic,这种方案会导致排在前面的消费者压力比后面的大

源码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class RangeAssignor extends AbstractPartitionAssignor {

@Override
public String name() {
return "range";
}

private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {
Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
String consumerId = subscriptionEntry.getKey();
MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
for (String topic : subscriptionEntry.getValue().topics()) {
put(topicToConsumers, topic, memberInfo);
}
}
return topicToConsumers;
}

@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);

Map<String, List<TopicPartition>> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet())
assignment.put(memberId, new ArrayList<>());
//遍历所有的topic
for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
String topic = topicEntry.getKey();
//获取topic的订阅者
List<MemberInfo> consumersForTopic = topicEntry.getValue();
//获取topic的分区数量
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null)
continue;
//对消费者进行排序
Collections.sort(consumersForTopic);
//计算每个消费者需要分配的分区数量
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
//计算没法平均分配的分区数量
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();

List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
//遍历订阅过的每个消费者
for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
//计算每个消费者分配的分区的开始index
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
//计算每个消费者是否需要额外分配一个分区
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start, start + length));
}
}
return assignment;
}
}

8.2.3 StickyAssignor

粘性分配器有两个目的

  1. 保证尽可能平衡的分配
  2. 当重新分配发生时,它尽可能多地保留了现有的分配。这有助于在主题分区从一个消费者转移到另一个消费者时节省一些开销处理。
  • 示例1:

假设有三个消费者C0、C1、C2,四个主题t0、t1、t2、t3,每个主题有2个分区,形成分区t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1。每个消费者都订阅了所有三个主题。

粘性分配器和轮询分配器的分配结果相同:

C0: [t0p0, t1p1, t3p0]
C1: [t0p1, t2p0, t3p1]
C2: [t1p0, t2p1]

假设C1被删除,

  • 轮询策略将会重新分配如下结果:

C0: [t0p0, t1p0, t2p0, t3p0]
C2: [t0p1, t1p1, t2p1, t3p1]

  • 粘性分配器策略将会重新分配如下结果:

C0 [t0p0, t1p1, t3p0, t2p0]
C2 [t1p0, t2p1, t0p1, t3p1]

可以看出粘性分配器策略保留所有以前的分配

  • 示例2

有三个消费者C0、C1、C2,以及三个主题t0、t1、t2,分别有1、2、3个分区。因此,分区为t0p0, t1p0, t1p1, t2p0, t2p1, t2p2。C0被订阅为t0;C1被订阅为t0、t1;C2被订阅为t0、t1、t2。

  • 轮询策略分配结果:

C0 [t0p0]
C1 [t1p0]
C2 [t1p1, t2p0, t2p1, t2p2]

  • 粘性策略分配结果:

C0 [t0p0]
C1 [t1p0, t1p1]
C2 [t2p0, t2p1, t2p2]

如果删除C0消费者

Round Robin重新分配结果如下:

C1 [t0p0, t1p1]
C2 [t1p0, t2p0, t2p1, t2p2]

Sticky重新分配结果如下:

C1 [t1p0, t1p1, t0p0]
C2 [t2p0, t2p1, t2p2]

8.2.4 CooperativeStickyAssignor

AbstractStickyAssignor的一个合作版本。它遵循与StickyAssignor相同的(粘性)分配逻辑,但允许合作式再平衡

Kafka的两种协议:EAGER,COOPERATIVE

  • EAGER再平衡协议要求消费者在参与再平衡事件之前总是撤销其所有分区。因此,它允许完全重新洗牌分配。

  • COOPERATIVE重新平衡协议允许消费者在参与重新平衡事件之前保留其当前拥有的分区。而不是立即重新分配任何拥有的分区,分配者可以向消费者表示需要撤销该分区,这样被撤销的分区就可以在下一次重新平衡事件中重新分配给另一个消费者。这是为粘性分配逻辑设计的,该逻辑试图通过合作调整来尽量减少分区的重新分配。

EAGER协议:RangeAssignor,RoundRobinAssignor,StickyAssignor

COOPERATIVE协议:CooperativeStickyAssignor

在集群庞大的情况下,频繁的发生集群节点上下线,消费订阅变更频繁,EAGER会操作全部的分区,COOPERATIVE只会操作部分,达到最终平衡

8.3 消费记录offset

  • 消费者利用offset记录每个消费者组消费到哪个位置

    • 通过group.id+topic+patitin.id来确定唯一的offset的key,value为offset的值

    • kafka通过将offset存放在一个名称为__consumer_offsets的内置topic中。默认有50个partition

  • 如何从头消费partition中的数据

    • 从头开始消费消息,需要两点
      • 设置auto.offset.reset属性为earliest,默认是latest
      • 创建一个新的消费者组
  • offset手动提交

    • offset默认是自动提交的,在实际很多场景中,需要业务服务确定消息是否被正常消费,正常消费后才可以提交offset

    • 设置enable.auto.commit属性为false

    • 手动提交分为两种:

      • 同步阻塞提交:kafkaConsumer.commitSync()
      • 异步提交:kafkaConsumer.commitAsync(),实现onComplete方法处理自定义逻辑

9. 事务消息

事务消息可以保障多个消息写入操作的原子性

  • 配置

    • 事务消息的acks必须为all或-1
    • 事务消息id前缀:transaction-id-prefix
  • 创建事务消息

    • 注解方式

      方法上添加注解@Transactional(rollbackFor = Exception.class)

    • executeInTransaction

      1
      2
      3
      4
      5
      6
      7
      8
      9
      kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
      @Override
      public Object doInOperations(KafkaOperations kafkaOperations) {
      kafkaOperations.send(TOPIC_NAME,"aaa");
      int i=1/0;
      kafkaOperations.send(TOPIC_NAME,"bbb");
      return true;
      }
      });

9.1相关报错

  • Producer factory does not support transactions

    • 加上事务前缀

      1
      spring.kafka.producer.transaction-id-prefix=tx_
  • Must set retries to non-zero when using the idempotent producer.

    • 增加重试次数

      1
      spring.kafka.producer.retries=3
  • Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.

    • 修改ACK确认方式

      1
      spring.kafka.producer.acks=-1

10. 消息拦截处理

spring-kafka在发送消息前和接收消息消费前,可以对消息进行拦截,并对消息做处理,比如可用于携带用户token,结合springSecurity一起用,异步传递用户

  • 发送消息前的拦截器接口:ProducerInterceptor
  • 接收消息前的拦截器接口:ConsumerInterceptor

实现上面两个接口,并且添加配置就可以简单实现

  • ProducerInterceptor接口实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package cn.aacopy.learn.simpleboot.config;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Random;

/**
* @author cmyang
* @date 2022/12/15 21:11
*/
public class AuthProducerInterceptor implements ProducerInterceptor<Object, Object> {
@Override
public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) {
Headers headers = record.headers();
headers.add("token", ("abc"+new Random().nextInt()).getBytes(StandardCharsets.UTF_8));
System.out.println(Thread.currentThread().getName()+"=================生成自定义消息头");
return record;
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> configs) {

}
}

  • ConsumerInterceptor接口实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package cn.aacopy.learn.simpleboot.config;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;

import java.util.Map;

/**
* @author cmyang
* @date 2022/12/15 21:30
*/
public class AuthConsumerInterceptor implements ConsumerInterceptor<Object, Object> {
@Override
public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> records) {
System.out.println("{{{{{{{{{{{" + records.count());
for (ConsumerRecord<Object, Object> record : records) {
Headers headers = record.headers();
System.out.println(Thread.currentThread().getId()+"+++++++++++++++++++++++获取到请求头" + headers);
}
return records;
}

@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {

}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> configs) {

}
}

  • 添加配置:
1
2
3
4
5
6
7
spring:
kafka:
bootstrap-servers: 192.168.80.128:9092
producer:
properties: {"interceptor.classes" : "cn.aacopy.learn.simpleboot.config.AuthProducerInterceptor"}
consumer:
properties: {"interceptor.classes" : "cn.aacopy.learn.simpleboot.config.AuthConsumerInterceptor"}
  • 存在的问题:

    • 该配置是全局配置,如果假如很重的逻辑,和kafka的高吞吐量违背,所以如果逻辑很复杂,最好只在需要的地方加
    • ConsumerInterceptor是批量监听,无法在针对每一条消息对本地线程变量做处理
  • 解决思路:

    • 发送端新建一个类继承KafkaTemplate,用于特殊发送,原KafkaTemplate还原功能正常使用
    • 在消息监听处理添加切面,处理消息数据
  • 添加AuthKafkaTemplate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package cn.aacopy.learn.simpleboot.config;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.Map;

/**
* @author cmyang
* @date 2022/12/15 23:51
*/
public class AuthKafkaTemplate<K, V> extends KafkaTemplate<K, V> {
public AuthKafkaTemplate(ProducerFactory producerFactory) {
super(producerFactory);
}

public AuthKafkaTemplate(ProducerFactory producerFactory, Map configOverrides) {
super(producerFactory, configOverrides);
}

public AuthKafkaTemplate(ProducerFactory producerFactory, boolean autoFlush) {
super(producerFactory, autoFlush);
}

public AuthKafkaTemplate(ProducerFactory producerFactory, boolean autoFlush, Map configOverrides) {
super(producerFactory, autoFlush, configOverrides);
}
}

  • 发送拦截器还是一样
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package cn.aacopy.learn.simpleboot.config;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Random;

/**
* @author cmyang
* @date 2022/12/15 21:11
*/
public class AuthProducerInterceptor implements ProducerInterceptor<Object, Object> {
@Override
public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) {
Headers headers = record.headers();
headers.add("token", ("abc"+new Random().nextInt()).getBytes(StandardCharsets.UTF_8));
System.out.println(Thread.currentThread().getName()+"=================生成自定义消息头");
return record;
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> configs) {

}
}

  • 添加到spring容器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package cn.aacopy.learn.simpleboot.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;

import java.util.Map;

/**
* @author cmyang
* @date 2022/12/15 23:53
*/
@Configuration
public class KafkaConfig {

private final KafkaProperties properties;

public KafkaConfig(KafkaProperties properties) {
this.properties = properties;
}

@Bean("authKafkaProducerFactory")
public DefaultKafkaProducerFactory<?, ?> authKafkaProducerFactory(
ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
Map<String, Object> map = this.properties.buildProducerProperties();
map.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, AuthProducerInterceptor.class.getName());
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(map);
String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
return factory;
}

@Bean
public DefaultKafkaProducerFactory<?, ?> kafkaProducerFactory(
ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
this.properties.buildProducerProperties());
String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
return factory;
}

@Bean("authKafkaTemplate")
public AuthKafkaTemplate<?, ?> authKafkaTemplate(ProducerFactory<Object, Object> authKafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
AuthKafkaTemplate<Object, Object> authKafkaTemplate = new AuthKafkaTemplate<>(authKafkaProducerFactory);
messageConverter.ifUnique(authKafkaTemplate::setMessageConverter);
map.from(kafkaProducerListener).to(authKafkaTemplate::setProducerListener);
map.from(this.properties.getTemplate().getDefaultTopic()).to(authKafkaTemplate::setDefaultTopic);
map.from(this.properties.getTemplate().getTransactionIdPrefix()).to(authKafkaTemplate::setTransactionIdPrefix);
return authKafkaTemplate;
}

@Bean
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener);
map.from(this.properties.getTemplate().getDefaultTopic()).to(kafkaTemplate::setDefaultTopic);
map.from(this.properties.getTemplate().getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix);
return kafkaTemplate;
}


}
  • 消费端新增自定义注解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package cn.aacopy.learn.simpleboot.config;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* @author cmyang
* @date 2022/12/16 0:11
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface AuthRequired {
}

  • 添加切面处理类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package cn.aacopy.learn.simpleboot.config;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.springframework.stereotype.Component;

/**
* @author cmyang
* @date 2022/12/16 0:32
*/
@Component
@Aspect
public class AuthPre {

@Before("@annotation(cn.aacopy.learn.simpleboot.config.AuthRequired)")
public void before(JoinPoint joinPoint) {
System.out.println("前置通知执行..." + joinPoint.getSignature().toShortString());
Object[] args = joinPoint.getArgs();
for (Object arg : args) {
if(arg instanceof ConsumerRecord) {
ConsumerRecord consumerRecord = (ConsumerRecord) arg;
Headers headers = consumerRecord.headers();
System.out.println("获取到请求头 -->" + headers);
break;
}
}
}
}

  • 测试发送端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package cn.aacopy.learn.simpleboot;

import cn.aacopy.learn.simpleboot.config.AuthKafkaTemplate;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;

import javax.annotation.Resource;

@SpringBootTest
class SimplebootApplicationTests {

@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Resource
private AuthKafkaTemplate<String, String> authKafkaTemplate;

@Test
void contextLoads() throws Exception {
for (int i = 0; i < 2; i++) {
// Thread.sleep(500);
kafkaTemplate.send("topic1", "hahahah"+System.currentTimeMillis());
System.out.println(Thread.currentThread().getName()+"=====================");
}
authKafkaTemplate.send("topic1", "hihihihi"+System.currentTimeMillis());
}

}

  • 测试接收端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@KafkaListener(id = "myId", topics = "topic1")
@AuthRequired
public void listen(ConsumerRecord<String, String> record) {
Headers headers = record.headers();
Header[] headers1 = headers.toArray();
for (Header header : headers1) {
System.out.println(Thread.currentThread().getId()+"++++++++++++++++++++header " + header);
}
System.out.println(Thread.currentThread().getId()+"++++++++++++++++++++++++++++消费者消费消息:"+record.value());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

11. 文件存储机制

  • 一个Kafka集群有多个broker

  • 一个kafka节点就是一个broker,broker包含多个topic

  • 一个topic中分为多个partition,每个partition是一个有序的队列,是topic物理上的分组

  • 一个partition分为多个segment

11.1 文件存储位置

在kafka的配置文件中,有一个server.properties配置文件,配置项log.dirs可以配置或查看数据文件存储位置

1
2
# A comma separated list of directories under which to store log files
log.dirs=/bitnami/kafka/data

查看文件存储目录

同一个topic有多个不同的partition,命名:topic名+序号,序号从0开始,例如aacopy-topic,有两个partition,aacopy-topic-0和aacopy-topic-1。

  • 为什么一个topic对应多个partition

    如果只有一个partition,就限定了topic只能在一个服务器节点上使用,没办法扩展肯定存在性能瓶颈

11.2 文件说明

11.3 partition中的segment

11.3.1 存储结构

  • segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件.

  • segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值+1。数值最大为64位long大小,20位数字字符长度,没有数字用0填充。

  • index索引文件,包含两个值,offset(消息唯一标识)和position(消息存储在log中的物理地址)

    index文件中的offset并不是连续的,是因为index不是为每条消息都建立索引,而是采用稀疏索引的方式,每隔一段建立索引,间隔大小通过配置log.index.interval.bytes查看,默认4KB,稀疏索引可以使内存中存放更多的索引数据。

  • 消息查找过程

    • 因为offset是连续的,只需要使用二分法很快找到要找的offset在哪个文件里,再次通过二分查找就能找到索引文件中的位置,因为是稀疏索引有可能并不能直接命中,需要再次遍历就能找到offset对应的物理位置,此时需要遍历的范围也很小。
    • 在二分查找索引的时候,标准的二分查找对缓存不友好,可能会造成不必要的缺页中断(线程被阻塞等待从磁盘加载没有被缓存到page cache 的数据),至少会产生从几毫秒跳到1秒的延迟。Kafka通过将索引项分为热区和冷区,查询热数据部分时,遍历的Page永远是固定的,这样避免缺页中断
  • Kafka中有三大类索引:位移索引、时间戳索引和已中止事务索引。分别是.index、.timeindex、.txnindex文件。

  • 为什么segment会被分为多个文件

    为了方便删除历史数据,节约空间,如果在一个文件里,一边写,一边清理前面的历史数据效率很低

11.3.2 测试验证

  1. 创建一个新的topic,设置1个partition,限制segment文件的大小

    1
    ./kafka-topics.sh --create --bootstrap-server 10.66.106.86:9092 --replication-factor 1 --partitions 1 --config segment.bytes=1024000 --config segment.index.bytes=512000 --topic aacopy-log-test
  2. 使用命令或者编写代码向topic中插入大量数据

    1
    ./kafka-producer-perf-test.sh --topic aacopy-log-test --num-records 10000 --record-size 100 --throughput -1 --producer-props bootstrap.servers=10.66.106.86:9092
  3. 查看index文件

    1
    ls -l /bitnami/kafka/data/aacopy-log-test-0/

  4. 查看segment index Dump

    1
    2
    ./kafka-run-class.sh kafka.tools.DumpLogSegments --files /bitnami/kafka/data/aacopy-log-test-0/00000000000000000000.index --print-data-log
    ./kafka-run-class.sh kafka.tools.DumpLogSegments --files /bitnami/kafka/data/aacopy-log-test-0/00000000000000009261.index --print-data-log

  5. 查看segment log Dump

    1
    2
    ./kafka-run-class.sh kafka.tools.DumpLogSegments --files /bitnami/kafka/data/aacopy-log-test-0/00000000000000000000.log --print-data-log
    ./kafka-run-class.sh kafka.tools.DumpLogSegments --files /bitnami/kafka/data/aacopy-log-test-0/00000000000000009261.log --print-data-log

  6. 编写代码自己插入两条随机double数据

    1
    2
    3
    4
    5
    6
    public void initLargeAmountMsg(String topic, int num) {
    Random random = new Random();
    for(int i=0; i<num; i++) {
    template.send(topic, String.valueOf(random.nextDouble()));
    }
    }

  7. log文件中的第一条消息的offset,和文件名的的编号是一致的

11.4 消息数据清理

kafka会启动一个定时任务,定时监测要删除的日志文件,通过log.retention.check.interval.ms进行配置时间间隔,默认5分钟执行一次。通过log.cleaner.threads配置执行清理任务的线程数

日志删除策略(log.cleanup.policy)

  • 直接删除,log.cleanup.policy=delete

    • 超过指定时间后删除

      log.retention.hours=168,默认168小时(7天)

      每个segment都会维护一个写入消息的时间戳,当当前segment写满后就不再更新时间戳,kafka根据当前时间和时间戳的差确定是否要删除该segment

    • 超过指定文件大小后删除

      log.retention.bytes=1073741824,超过1G后删除

      当已经写满的日志文件总大小大于设置的阀值时,执行删除

  • 日志压缩,log.cleanup.policy=compact

12. 消息可靠性

12.1 Kafka的副本

kafka的每个partition可以有多个副本(replication),在这些副本中有一个leader和多个follower,leader负责与消息生产者和消费者进行交互,follower负责从leader拉去消息做数据备份,如果leader挂了,会从follower中选举一个新的leader。

12.2 消息发送流程中的ACK

消息生产者producer发送消息到topic中的leader patition,patition收到消息后需要向producer发送ack确认消息已经收到。

12.2.1 CAP

CAP: 指的是在一个分布式系统中,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性)

三者是不能同时满足的,只能保证其二,P一般是要保证的,一致性和可用性根据具体业务情况选择,

12.2.2 ACK级别

Kafka通过ACK的级别来确定消息的可靠性级别,如果消息要可靠性偏高选择CP,如果要性能偏高选择AP

Kafka通过acks参数来设置ACK级别

  • acks=0

    不确认,producer只发送一次消息,不管服务端是否收到消息。性能最高,但消息可靠性最低。如果在发生期间,leader partition所在的broker宕机了,就会消息丢失

  • acks=1

    默认级别,leader patition收到消息并写入磁盘后就返回确认收到消息,不关心follower partition是否同步成功。

  • acks=all,leader等所有ISR中的follower都同步完成后,再返回确认收到消息。消息可靠性最高。也可以设置成-1,和all等同

这一部分可以查看官方注释:ProducerConfig.ACKS_DOC

12.2.3 ISR(in-sync replica)

一个副本集合,该集合中的副本都会保持与leader同步,leader也在该集合中。

  • 通过配置项min.insync.replicas来设置ISR中的最小副本数,默认为1,当ISR中的副本数小于配置项中的值时,会返回异常。
  • 当一个follower长时间未从leader同步消息时(通过replica.lag.time.max.ms配置超时时间,默认30秒),就会从ISR集合中移除,进入OSR集合
  • 如果leader发生故障,会从ISR集合中选举一个新leader

12.2.4 OSR(out-of-sync-replica)

与leader副本分区同步滞后过多的副本集合

12.3.5 High Watermark

  • HW (High Watermark)

    • ISR中所有副本都同步过的offset位置
    • HW线之前的数据才对消费者可见
  • LEO(log end offset)

    • partition中最后一条消息所在的offset位置
  • HW用来保证消费数据的一致性和各个副本数据的一致性

    • 如果leader宕机,会从ISR中选举一个新的leader,此时其他follower会将log文件件中高于HW的数据删掉,然后从leader重新同步,这样可以确保各个副本数据的一致性
    • 如果follower宕机,会被leader从ISR集合中移除,等follower再次恢复,会读取本地记录的宕机前的HW之后的数据删掉,然后重新同步,等follower的LEO追上新的HW时,重新加入ISR集合中

12.3 总结

  • partition的多副本机制保障了消息存储的可靠性
  • ACK机制保障了producer消息投递的可靠性
  • High Watermark保障了消费数据的一致性和副本数据的一致性

13. 高性能总结

  • 消息文件存储机制

    • 一个Kafka集群有多个broker

    • 一个broker包含多个topic

    • 一个topic中分为多个partition,每个partition是一个有序的队列,是topic物理上的分组

    • 一个partition分为多个segment

  • 索引机制

    • 采用二分法和稀疏索引快速查找
  • 消息缓冲区

    • 生产的消息不会直接到队列中,而是经过一个缓冲区,等缓冲区满了或者超时后再批量发送
  • 磁盘顺序写入

  • 采用操作系统的页缓存(Page cache),而不是JVM的缓存,使用JVM的缓存会导致GC

  • 零拷贝(ZeroCopy)