黑马头条(6)
Kafka概述
消息中间件 | 建议 |
---|---|
Kafka | 追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务 |
RocketMQ | 可可靠性要求很高的金融互联网领域,稳定性高,经历了多次阿里双11考验 |
RabbitMQ | 性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的RabbitMQ |
Kafka是一个分布式流媒体平台,类似于消息队列或企业消息传递系统
- producer : 发布消息的对象称为主题生产者(Kafka topic producer)
- topic: Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
- consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumer)
- broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
环境准备
zookeeper准备
docker pull zookeeper:3.4.14
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
Kafka准备
docker pull wurstmeister/kafka:2.12-2.3.1
# 启动一个新的 Kafka 容器,指定容器名称为 kafka
docker run -d --name kafka \
# 设置 Kafka 广播的主机名,以便其他 Kafka 客户端能够连接到此主机
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
# 设置 Kafka 连接的 ZooKeeper 地址,ZooKeeper 是 Kafka 的协调服务
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
# 设置 Kafka 向外界广播的监听地址,告知客户端连接的地址
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
# 设置 Kafka 本地的监听地址,Kafka 监听在所有的网络接口上
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
# 设置 Kafka 进程的 Java 堆内存参数,分配最大256M和最小256M内存
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
# 设置网络模式为主机模式,容器将使用主机的网络配置
--net host \
# 使用 wurstmeister/kafka 镜像,并指定版本为 2.12-2.3.1
wurstmeister/kafka:2.12-2.3.1
入门
ConsumerQuickStart.java
public class ConsumerQuickStart {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
consumer.subscribe(Collections.singletonList("topic-first"));
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key());
System.out.println(consumerRecord.value());
}
}
}
}
ProducerQuickStart.java
public class ProducerQuickStart {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 创建kafka生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> kvProducerRecord = new ProducerRecord<>("topic-first", "key-001", "hello kafka");
producer.send(kvProducerRecord);
// 关闭通道,必须关闭否则发送不成功
producer.close();
}
}
所欲消费者都能收到消息
生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
分区策略
Kafka中的分区机制指的是将每个主题划分为多个分区(Partition)
可以处理更多的消息,不受单台服务器的限制,可以不受限的处理更多是数据
分区策略 | 说明 |
---|---|
轮询策略 | 按顺序轮流将每条数据分配到每个分区中 |
随机策略 | 每次都随机地将消息分配到每个分区 |
按键保存策略 | 生产者发送数据的时候,可以指定一个key ,计算这个key 的hashCode 值,按照hashCode 的值对不同消息进行存储 |
⭐Kafka高可用设计
集群
- Kafka的服务端由被称为Broker的服务进程构成,即一个Kafka集群由多个Broker组成
- 这样如果集群中某一台机器宕机,其他机器上的Broker依然能够对外提供服务
备份机制(Replication)- 同步方式
Kafka中的消息的备份又叫做副本 (Replica)
Kafka定义了两类副本:
- 领导者副本(Leader Replica)
- 追随者副本(Follower Replica)
追随者副本又分为两类:
- ISR(in-sync replica)需要同步复制保存的follower
- 普通副本,异步复制保存
如果 leader 失效后,需要选出新的leader,选举的原则如下:
第一:选举时优先从ISR中选定,因为这个列表中follower的数据是与leader同步的。
第二:如果ISR列表中的follower都不行了,就只能从其他follower中选取。
极端情况下-所有副本都失效了,两种方案
- 第一:等待ISR中的一个活过来,选为Leader,数据可靠,但活过来的时间不确定
- 第二:选择第一个活过来的Replication,不一定是ISR中的,选为leader,以最快速度恢复可用性,但数据不一定完整。
Kafka生产者详解
发送类型
- 同步发送
使用send()方式发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功
// 同步方式会产生阻塞
RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
System.out.println(recordMetadata.offset());
- 异步发送
// 异步发送消息
producer.send(kvProducerRecord, (recordMetadata1, e) -> {
if (e != null) {
System.out.println("记录异常信息到日志表中");
}
System.out.println(recordMetadata1.offset());
});
参数详解
ack
确认机制 | 说明 |
---|---|
acks = 0 | 生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快 |
acks = 1 (默认值) | 只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应 |
acks = all | 只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应 |
retries
生产者从服务器收到的错误有可能时临时性错误,在这种情况下,retries
参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms
prop.put(ProducerConfig.RETRIES_CONFIG, 10);
消息压缩
默认情况下,消息发送时不会被压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
压缩算法 | 说明 |
---|---|
snappy | 占用较少的 CPU,但能够提供较好的性能和相当可观的压缩比。如果看重性能和网络带宽,建议采用。 |
lz4 | 占用较少的 CPU,压缩和解压缩速度较快,压缩比也很可观。 |
gzip | 占用较多的 CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法。 |
使用压缩可以降低网络传输开销和存储开销,而这往往时向Kafka发送消息的瓶颈所在。
Kafka消费者详解
消费者组
- 消费者组(Consumer Group):指的就是由一个或多个消费者组成的群体
- 一个发布在Topic上消息被分发给消费者组中的一个消费者
- 所有的消费者都在一个组中,那么这个就成了queue模型
- 所有的消费者都在不同的组中,那么就完全成立发布-订阅模型
消息有序性
topic分区中消息只能由消费者中的唯一一个消费者处理,所以消息是按照先后顺序进行处理的。但是其仅仅保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序。所以,如果想要顺序的处理Topic的所有消息,那就只提供一个分区
提交和偏移量
kafka不会像其他 JMS 队列那样需要得到消费者的确认,消费者可以使用
kafka 来追踪消息在分区的位置(偏移量),消费者会往一个叫做_consumer_offset
的特殊主题发送消息,消息包含了每个分区的偏移量。如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡。
偏移量提交方式
- 自动提交偏移量
当enable.auto.commit
被设置为true
,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll()
方法接受的最大偏移量提交上去
- 手动提交
当enable.auto.commit
被设置未false
可以有以下三种提交方式
- 提交当前偏移量(同步提交)
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key());
System.out.println(consumerRecord.value());
System.out.println(consumerRecord.offset());
System.out.println(consumerRecord.partition());
try{
// 同步提交偏移量
consumer.commitSync();
}catch (CommitFailedException e){
System.out.println("记录异常信息到日志表中, 异常:" + e);
}
}
}
- 异步提交
可能导致偏移量覆盖
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key());
System.out.println(consumerRecord.value());
System.out.println(consumerRecord.offset());
System.out.println(consumerRecord.partition());
// 异步方式提交偏移量
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if (e != null) {
System.out.println("记录错误的提交偏移量 " + map + ", 异常信息为 " + e);
}
}
});
}
- 同步和异步组合提交
// 异步和同步提交偏移量同时使用
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key());
System.out.println(consumerRecord.value());
System.out.println(consumerRecord.offset());
System.out.println(consumerRecord.partition());
}
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("记录错误的信息" + e);
} finally {
// 同步提交偏移量
consumer.commitSync();
}