Kafka命令行操作

创建 topic

1
./kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --replication-factor 3 --partitions 3 --topic test_2

image-20220514215128761

1
2
3
--replication-factor 副本数量
--partitions 分区数量
--topic topic 名称
  • 手动指定副本的存储位置

    1
    ./kafka-topics.sh --create --topic test_3 --zookeeper master:2181 --replica-assignment 0:1,1:2

    image-20220514215233630.png

删除 topic

1
./kafka-topics.sh  --delete --topic tpc_1 --zookeeper master:2181
1
异步线程去删除)删除 topic,需要一个参数处于启用状态: delete.topic.enable = true,否则删不掉

查看 topic

  • 列出当前系统中的所有 topic

    1
    kafka-topics.sh --list --zookeeper master:2181

    image-20220514215549569

  • 查看 topic 详细信息

    1
    2
    3
    ./kafka-topics.sh --create --topic tpc_1   --zookeeper master:2181 --replica-assignment 0:1,1:2

    ./kafka-topics.sh --describe --topic tpc_1 --zookeper master:2181

增加分区数

1
./kafka-topics.sh --alter --topic test_3 --partitions 3 --zookeeper master:2181

image-20220514215930035

1
Kafka 只支持增加分区,不支持减少分区

动态配置 topic 参数

  • 通过管理命令,可以为已创建的 topic 增加、修改、删除 topic level 参数

  • 添加、修改配置参数(开启压缩发送传输种提高kafka消息吞吐量的有效办法(‘gzip’, ‘snappy’, ‘lz4’, ‘zstd’))

    1
    ./kafka-configs.sh --zookeeper master:2181 --entity-type topics --entity-name tpc_1 --alter --add-config compression.type=gzip 

    image-20220514220000191

  • 删除配置参数

    1
    ./kafka-configs.sh --zookeeper master:2181 --entity-type topics --entity-name tpc_1 --alter --delete-config compression.type

    image-20220514220029815

Kafka命令行生产者与消费者操作

  • 生产者:kafka-console-producer

    1
    ./kafka-console-producer.sh --broker-list master:9092, slave1:9092, slave2:9092 --topic tpc_1

    image-20220514214530805

  • 消费者:kafka-console-consumer

  • 消费消息

    1
    ./kafka-console-consumer.sh --bootstrap-server master:9092, slave1:9092, slave2:9092 --topic tpc_1 --from-beginning
  • 指定要消费的分区,和要消费的起始 offset

    1
    ./kafka-console-consumer.sh --bootstrap-server master:9092,slave1:9092,slave2:9092 --topic tcp_1 --offset 2 --partition 0

配置管理 kafka-configs

  • 比如查看 topic 的配置可以按如下方式执行:

    1
    ./kafka-configs.sh zookeeper master: 2181 --describe --entity-type topics --entity-name tpc_2 

    image-20220514220122026

  • 比如查看 broker 的动态配置可以按如下方式执行:

    1
    ./kafka-configs.sh zookeeper master: 2181 --describe --entity-type brokers --entity-name 0 --zookeeper master:2181

    image-20220514220138188

Kafka生产者api示例

生产者api示例

一个正常的生产逻辑需要具备以下几个步骤

1
2
3
4
(1)配置生产者客户端参数及创建相应的生产者实例
(2)构建待发送的消息
(3)发送消息
(4)关闭生产者实例
  • 示例代码(部分截取)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Properties props = new Properties(); 

//设置 kafka 集群的地址
props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");

//ack 模式,取值有 0,1,-1(all) , all 是最慢但最安全的,
props.put(“acks”, “all”);

//失败重试次数->失败会自动重试(可恢复/不可恢复)-->(有可能会造成数据的乱序)
props.put(“retries”, 3);

//数据发送的批次大小提高效率/吞吐量太大会数据延迟
props.put(“batch.size”, 10);

//消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
props.put("linger.ms", 10000);

//数据发送请求的最大缓存数
props.put("max.request.size",10);

//整个 Producer 用到总内存的大小,如果缓冲区满了会提交数据到服务端 buffer.memory 要大于 batch.size,否则会报申请内存不足的错误降低阻塞的可能性
props.put(“buffer.memory”, 10240);

//key-value序列化器
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

//字符串最好
props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
  • 消息对象 ProducerRecord,它并不是单纯意义上的消息,它包含了多个属性,原本需要发送的与业务关的消息体只是其中的一个 value 属性 ,比“ Hello, rgzn!”只是 ProducerRecord 对象的一个属性。
1
2
3
4
5
6
7
8
9
ProducerRecord 类的定义如下:
public class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
}

必要参数配置

  • 在创建真正的生产者实例前需要配置相应的参数,比如需要连接的 Kafka 集群地址。在 Kafka 生产者客户端 KatkaProducer 中有 3 个参数是必填的。
1
2
3
* bootstrap.servers 
* key.serializer
* value.serializer
  • 为了防止参数名字符串书写错误,可以使用如下方式进行设置:
1
2
3
props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptorPrefix.class.getName());
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

生产者api参数发送方式

这个客户端经过了生产环境测试并且通常情况它比原来Scals客户端更加快速、功能更加齐全。你可以通过添加以下示例的Maven坐标到客户端依赖中来使用这个新的客户端(你可以修改版本号来使用新的发布版本):

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>

发后即忘( fire-and-forget)

发后即忘,它只管往 Kafka 发送,并不关心消息是否正确到达。在大多数情况下,这种发送方式没有问题; 不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式的性能最高,可靠性最差。

1
Future<RecordMetadata> send = producer.send(rcd);

同步发送(sync )

0.8.x 前,有一个参数 producer.type=sycn|asycn 来决定生产者的发送模式;现已失效(新版中,producer 在底层只有异步)

1
2
3
4
5
try {
producer.send(rcd).get();
} catch (Exception e) {
e.printStackTrace();
}

在调用 send 方法后可以接着调用 get() 方法,send 方法的返回值是一个 Future\对象,RecordMetadata 里面包含了发送消息的主题、分区、偏移量等信息。改写后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
for (int i = 0; i < 10; i++) {
try {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);
/*同步发送消息*/
RecordMetadata metadata = producer.send(record).get();
System.out.printf("topic=%s, partition=%d, offset=%s \n",
metadata.topic(), metadata.partition(), metadata.offset());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

此时得到的输出如下:偏移量和调用次数有关,所有记录都分配到了 0 分区,这是因为在创建 Hello-Kafka 主题时候,使用 --partitions 指定其分区数为 1,即只有一个分区。

1
2
3
4
5
6
7
8
9
10
topic=Hello-Kafka, partition=0, offset=40 
topic=Hello-Kafka, partition=0, offset=41
topic=Hello-Kafka, partition=0, offset=42
topic=Hello-Kafka, partition=0, offset=43
topic=Hello-Kafka, partition=0, offset=44
topic=Hello-Kafka, partition=0, offset=45
topic=Hello-Kafka, partition=0, offset=46
topic=Hello-Kafka, partition=0, offset=47
topic=Hello-Kafka, partition=0, offset=48
topic=Hello-Kafka, partition=0, offset=49

异步发送(async )

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

1
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试

通常我们并不关心发送成功的情况,更多关注的是失败的情况,因此 Kafka 提供了异步发送和回调函数。 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);
/*异步发送消息,并监听回调*/
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("进行异常处理");
} else {
System.out.printf("topic=%s, partition=%d, offset=%s \n",
metadata.topic(), metadata.partition(), metadata.offset());
}
}
});
}

生产者原理解析

image-20220514225146784.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
0、新建kafka生产实例,参数也是放在kafkaProducer里面

1、Producerinterceptor拦截器,设置特定的规则对消息进行拦截,可以通过指定的消息

2.Serializer序列化器,创建生产者对象时必须指定序列化器,作用就是将key和value转换为二进制

3.Partitioner,topic中有分区,如何分发就是通过此处有规划的分发数据

4.1 RecordAccumulator消息累加器,其中有多个分区,对于每个分区,都会单独维护主要用来缓存消息以便 Sender 线程可以批量发送, 进而减少网络传输的资源消耗以提升性能。

4.2 RecordAccumulator 缓存的大小可以通过生产者客户端参数 buffer.memory 配 置, 默认值为 33554432B ,即 32M。

4.3 如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时KafkaProducer.send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms 的配置,此参数的默认值为 60000,即 60 秒。(此配置可以理解为阻塞时间,在这个范围内不会抛出异常)

4.4主线程中发送过来的消息都会被迫加到 RecordAccumulator 的某个双端队列(Deque )中,RecordAccumulator 内部为每个分区都维护了一个双端队列,即Deque<ProducerBatch>。消息写入缓存时,追加到双端队列的尾部;

4.5、 Sender 读取消息时,从双端队列的头部读取。

4.6 注意:ProducerBatch 是指一个消息批次; 与此同时,会将较小的 ProducerBatch凑成一个较ProducerBatch ,也可以减少网络请求的次数以提升整体的吞吐量。

4.7 ProducerBatch 大小和 batch.size 参数也有着密切的关系。

4.8 当一条消息(ProducerRecord ) 流入RecordAccumulator 时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个ProducerBatch (如果没有则新建),查看 ProducerBatch 中是否还可以写入这个ProducerRecord,如果可以写入,如果不可以则需要创建一个新的 Producer Batch。

4.9 在新建ProducerBatch 时评估这条消息的大小是否超过 batch.size 参数大小, 如果不超过, 那么就以 batch.size 参数的大小来创建 ProducerBatch。

4.10 如果生产者客户端需要向很多分区发送消息, 则可以将 buffer.memory 参数适当调大以增加整体的吞吐量
6.1、 Sender 从 RecordAccumulator 获取缓存的消息之后,会进一步将<分区,Deque<Producer Batch>>的形式转变成<Node,List<ProducerBatch>的形式,其中 Node 表示 Kafka 集群 broker 节点。

6.2 对于网络连接来说,生产者客户端是与具体 broker 节点建立的连接,也就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区;

6.3 、 而对于 KafkaProducer 的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络 I/O层面的转换。

6.4 在转换成<Node, List<ProducerBatch>>的形式之后, Sender 会进一步封装成<Node,Request> 的形式, 这样就可以将 Request 请求发往各个 Node 了,这里的 Request 是 Kafka 各种协议请求;

6.5 下一步直接就可以从Request 发送到 Selector 在转到 kafka集群

7.1 缓存操作可以理解为当请求 从 sender 发送给 kafka 集群时候,sender 是不知道是否成功发送,即kafka 是否接收到消息,所以此功能是,当sender只要给 kafka 发送请求,此消息就同步InFlightRequests

7.2 请求在从 sender 线程发往 Kafka 之前还会保存到InFlightRequests 中,InFlightRequests 保存对象的具体形式为Map<Nodeld, Deque<request>>,它的主要作用是缓存了已经发出去但还没有收到服务端响应的请求(Nodeld 是一个 String 类型,表示节点的 id 编号)。

7.3 与此同时,InFlightRequests 还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与 Node 之间的连接) 最多缓存的请求数。

7.4 这个配置参数为 max.in.flight.request.per. connection ,默认值为 5,即每个连接最多只能缓存 5 个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应
( Response )。

8.1 提交到selector 准备发送

9、发送到 kafka集群

10、当 kafka 集群受到消息 ,集群响应,返回给selector

11.1、selector 回复给 InFlightRequests

11.2、如果没有受到响应, request 则会在 InFlightRequests 一直缓存
11.3 通过比较 Deque<Request> 的 size 与这个参数的大小来判断对应的 Node中是否己经堆积了很多未响应的消息, 如果真是如此, 那么说明这个 Node 节点负载较大或网络连接有问题,再继其发送请求会增大请求超时的可能。

消费者API

一个正常的消费逻辑需要具备以下几个步骤:

1
2
3
4
5
6
(1)配置消费者客户端参数
(2)创建相应的消费者实例;
(3)订阅主题;
(4)拉取消息并消费;
(5)提交消费位移 offset;
(6)关闭消费者实例。
  • 消费者API示例代码(部分截取)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Properties props = new Properties(); 

// 定义 kakfa 服务的地址,不需要将所有 broker 指定上
props.put("bootstrap.servers", "master:9092");

// 指定 consumer group
props.put("group.id", "g1");

// 是否自动提交 offset
props.put("enable.auto.commit", "true");

// 自动提交 offset 的时间间隔
props.put("auto.commit.interval.ms", "1000");

// key 的反序列化类
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

// value 的反序列化类
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

// 如果没有消费偏移量记录,则自动重设为起始 offset:latest, earliest, none
//Earliest目前状态下最前面的一条消息(日志在一定保存时间后会自动清空)
//none(上次记录的偏移量,如果没有,会抛异常)
props.put("auto.offset.reset","earliest");

// 定义 consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 消费者订阅的 topic, 可同时订阅多个
consumer.subscribe(Arrays.asList("first", "test","test1"));

Kafka消费者可选属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
1. fetch.min.byte
消费者从服务器获取记录的最小字节数。如果可用的数据量小于设置值,broker 会等待有足够的可用数据时才会把它返回给消费者。

2. fetch.max.wait.ms
broker 返回给消费者数据的等待时间,默认是 500ms。

3. max.partition.fetch.bytes
该属性指定了服务器从每个分区返回给消费者的最大字节数,默认为 1MB。

4. session.timeout.ms
消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。

5. auto.offset.reset
该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:

latest (默认值) :在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的最新记录);
earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录。
6. enable.auto.commit
是否自动提交偏移量,默认值是 true。为了避免出现重复消费和数据丢失,可以把它设置为 false。

7. client.id
客户端 id,服务器用来识别消息的来源。

8. max.poll.records
单次调用 poll() 方法能够返回的记录数量。

9. receive.buffer.bytes & send.buffer.byte
这两个参数分别指定 TCP socket 接收和发送数据包缓冲区的大小,-1 代表使用操作系统的默认值。
  • 必要参数配置
1
2
3
4
5
6
7
8
9
Properties props = new Properties(); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);

props.put(ConsumerConfig.GROUP_ID_CONFIG,groupid);

props.put(ConsumerConfig.CLIENT_ID_CONFIG,clientid);

subscribe 订阅主题

  • subscribe 有如下重载方法:

    1
    2
    3
    4
    public void subscribe(Collection<String> topics,ConsumerRebalanceListener listener)
    public void subscribe(Collection<String> topics)
    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
    public void subscribe(Pattern pattern)
  • 指定集合方式订阅主题

    1
    2
    consumer.subscribe(Arrays.asList(topic1)); 
    consumer subscribe(Arrays.asList(topic2));
  • 正则方式订阅主题

    如果消费者采用的是正则表达式的方式(subscribe(Pattern))订阅, 在之后的过程中,如果有人又创建了新的主题,并且主题名字与正表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息。如果应用程序需要消费多个主题,并且可以处理不同的类型,那么这种订阅方式就很有效。

    • 正则表达式的方式订阅的示例如下

      1
      consumer.subscribe(Pattern.compile ("topic.*" )); 

assign 订阅主题

消费者不仅可以通过 KafkaConsumer.subscribe() 方法订阅主题,还可直接订阅某些主题的指定分区;

  • 在 KafkaConsumer 中提供了 assign() 方法来实现这些功能,此方法的具体定义如下:

    1
    public void assign(Collection<TopicPartition> partitions);
    1
    这个方法只接受参数 partitions,用来指定需要订阅的分区集合。
    1
    consumer.assign(Arrays.asList(new TopicPartition ("tpc_1" , 0),new TopicPartition(“tpc_2”,1))) ;

subscribe 与 assign 的区别

  • 通过 subscribe()方法订阅主题具有消费者自动再均衡功能 ;

    1
    在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。 当消费组的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。
  • assign() 方法订阅分区时,是不具备消费者自动均衡的功能的;

    1
    其实这一点从 assign()方法参数可以看出端倪,两种类型 subscribe()都有 ConsumerRebalanceListener 类型参数的方法,而 assign()方法却没有。

取消订阅

可以使用 KafkaConsumer 中的 unsubscribe()方法采取消主题的订阅,这个方法既可以取消通过subscribe( Collection)方式实现的订阅; 也可以取消通过 subscribe(Pattem)方式实现的订阅,还可以取消通过 assign( Collection)方式实现的订阅。示例码如下:

1
consumer.unsubscribe(); 

如果将 subscribe(Collection )或 assign(Collection)集合参数设置为空集合,作用与 unsubscribe()方法相同,如下示例中三行代码的效果相同:

1
2
3
consumer.unsubscribe(); 
consumer.subscribe(new ArrayList<String>()) ;
consumer.assign(new ArrayList<TopicPartition>());

消息的消费模式

Kafka 中的消费是基于拉取模式的。消息的消费一般有两种模式:推送模式和拉取模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。

对于 poll () 方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空如果订阅的所有分区中都没有可供消费的消息,那么 poll()方法返回为空的消息集; poll () 方法具体定义如下:
public ConsumerRecordspoll(final Duration timeout)
超时时间参数 timeout , 用来控制 poll() 方法的阻塞时间, 在消费者的缓冲区里没有可用数据时会发生阻塞。如果消费者程序只用来单纯拉取并消费数据,则为了提高吞吐率,可以把 timeout 设置为Long.MAX_VALUE;

  • 消费者消费到的每条消息的类型为 ConsumerRecord

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public class ConsumerRecord<K, V> { 
    public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
    public static final int NULL_SIZE = -1;
    public static final int NULL_CHECKSUM = -1;
    private final String topic;
    private final int partition;
    private final long offset;
    private final long timestamp;
    private final TimestampType timestampType;
    private final int serializedKeySize;
    private final int serializedValueSize;
    private final Headers headers;
    private final K key;
    private final V value;
    private volatile Long checksum;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    topic partition 这两个字段分别代表消息所属主题的名称和所在分区的编号。

    offsset 表示消息在所属分区的偏移量。

    timestamp 表示时间戳,与此对应的 timestampType 表示时间戳的类型。

    timestampType 有两种类型 CreateTime 和 LogAppendTime , 分别代表消息创建的时间戳和消息追加到日志的时间戳。

    headers 表示消息的头部内容。

    key value 分别表示消息的键和消息的值,一般业务应用要读取的就是 value ;

    serializedKeySize、serializedValueSize 分别表示 key、value 经过序列化之后的大小,如果 key 为空, 则 serializedKeySize 值为 -1,同样,如果 value 为空,则 serializedValueSize 的值也会为 -1;

    checksum 是 CRC32 的校验值。

指定位移消费

有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而KafkaConsumer 中的 seek() 方法正好提供了这个功能,让我们可以追前消费或回溯消费。

  • seek()方法的具体定义如下:

    1
    public void seek(TopicPartiton partition,long offset);

再均衡监听器

一个消费组中,一旦有消费者的增减发生,会触发消费者组的 rebalance 再均衡; 如果 A 消费者消费掉的一批消息还没来得及提交 offset, 而它所负责的分区在 rebalance 中转移给了 B 消费者,则有可能发生数据的重复消费处理。此情形下,可以通过再均衡监听器做一定程度的补救;

自动位移提交

Kafka 中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数 enable.auto.commit 配置, 默认值为 true 。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数 auto.commit.interval.ms 配置, 默认值为 5 秒, 此参数生效的前提是 enable.

auto.commit 参数为 true。

在默认的方式下,消费者每隔 5 秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在 poll() 方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。

Kafka 消费的编程逻辑中位移提交是一大难点,自动提交消费位移的方式非常简便,它免去了复杂的位移提交逻辑,让编码更简洁。但随之而来的是重复消费和消息丢失的问题。

  • 重复消费

    假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象(对于再均衡的情况同样适用)。我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样并不能避免重复消费的发送,而且也会使位移提交更加频繁。

  • 丢失消息

    按照一般思维逻辑而言,自动提交是延时提交,重复消费可以理解,那么消息丢失又是在什么情形下会发生的呢?我们来看下图中的情形: 拉取线程不断地拉取消息并存入本地缓存, 比如在 BlockingQueue 中, 另一个处理线程从缓存中读取消息并进行相应的逻辑处理。设目前进行到了第 y+l 次拉取,以及第 m 次位移提交的时候,也就是x+6 之前的位移己经确认提交了, 处理线程却还正在处理 x+3 的消息; 此时如果处理线程发生了异常, 待其恢复之后会从第 m 次位移提交处,也就是 x+6 的位置开始拉取消息,那么 x+3 至 x+6 之间的消息就没有得到相应的处理,这样便发生消息丢失的现象。

手动位移提交(调用 kafka api)

自动位移提交的方式在正常情况下不会发生消息丢失或重复消费的现象, 但是在编程的世界里异常无可避免; 同时, 自动位移提交也无法做到精确的位移管理。 在 Kafka 中还提供了手动位移提交的方式, 这样可以使得开发人员对消费位移的管理控制更加灵活。
很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费; 手动的提交方式可以让开发人员根据程序的逻辑在合适的地方进行位移提交。 开启手动提交功能的前提是消费者客户端参数 enable.auto.commit 配置为 fals ,示例如下:

1
props.put(ConsumerConf.ENABLE_AUTO_COMMIT_CONFIG, false); 

手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync()和commitAsync()两种类型的方法。

Topic管理 API

一般情况下,我们都习惯使用 kafka-topic.sh 本来管理主题,如果希望将管理类的功能集成到公司内部的系统中,打造集管理、监控、运维、告警为一体的生态平台,那么就需要以程序调用 API 方式去实现。这种调用 API 方式实现管理主要利用 KafkaAdminClient 工具类KafkaAdminClient 不仅可以用来管理 broker、配置和 ACL (Access Control List),还可用来管理主题)

列出主题

1
2
3
4
5
ListTopicsResult listTopicsResult = adminClient.listTopics(); 

Set<String> topics = listTopicsResult.names().get();

System.out.println(topics);

查看主题信息

1
2
3
4
5
6
7
8
9
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList("tpc_4", "tpc_3")); 


Map<String, TopicDescription> res = describeTopicsResult.all().get();

Set<String> ksets = res.keySet();
for (String k : ksets) {
System.out.println(res.get(k));
}

创建主题

  • 代码示例(部分截取)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 参数配置
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092,slave2:9092");
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);

// 创建 admin client 对象
AdminClient adminClient = KafkaAdminClient.create(props);

// 由服务端 controller 自行分配分区及副本所在 broker
NewTopic tpc_3 = new NewTopic("tpc_3", 2, (short) 1);

// 手动指定分区及副本的 broker 分配
HashMap<Integer, List<Integer>> replicaAssignments = new HashMap<>();

// 分区 0,分配到 broker0,broker1 replicaAssignments.put(0,Arrays.asList(0,1));
// 分区 1,分配到 broker0,broker2
replicaAssignments.put(0,Arrays.asList(0,1));
NewTopic tpc_4 = new NewTopic("tpc_4", replicaAssignments);
CreateTopicsResult result = adminClient.createTopics(Arrays.asList(tpc_3,tpc_4));

// 从 future 中等待服务端返回
try {
result.all().get();
} catch (Exception e) {
e.printStackTrace();
}
adminClient.close();

删除主题

1
2
3
4
5
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("tpc_1", "tpc_1")); 

Map<String, KafkaFuture<Void>> values = deleteTopicsResult.values();

System.out.println(values);

其他管理

除了进行 topic 管理之外,KafkaAdminClient 也可以进行诸如动态参数管理,分区管理等各类管理操作;

powershell