MQ(2)消息队列

引言

由MQ(1)消息队列文章我们知道Kafka采用发布/订阅队列,区别于RabbitMQ的队列模式,队列模型每条消息只能被一个消费者消费,而发布/订阅模型就是为让一条消息可以被多个消费者消费而生的,当然队列模型也可以通过消息全量存储至多个队列来解决一条消息被多个消费者消费问题,但是会有数据的冗余。
接下来的内容都基于发布/订阅模型Kafka。

Kafka


一般我们称发送消息方为生产者 Producer,接受消费消息方为消费者Consumer,消息队列服务端为Broker。消息从Producer发往Broker,Broker将消息存储至本地,然后Consumer从Broker拉取消息,或者Broker推送消息至Consumer,最后消费。

1. Broker
Kafka 集群包含一个或多个服务器,服务器节点称为broker。broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。刚好分布均匀。
如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。
2. topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。
3. Partition(分区)
为了提高并发度,Kafka引入了分区Partition的概念,在RocketMQ中也叫队列,本质一样。即消息是发往一个主题下的某个分区中。例如某个主题下有 5 个分区,那么这个主题的并发度就提高为 5 ,同时可以有 5 个消费者并行消费该主题的消息,每个topic至少有一个partition。。一般可以采用轮询或者 key hash 取余等策略来将同一个主题的消息分配到不同的队列中。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。

3. Consumer Group
与之对应的消费者一般都有组的概念 Consumer Group, 即消费者都是属于某个消费组的。一条消息会发往多个订阅了这个主题的消费组。假设现在有两个消费组分别是Group 1 和 Group 2,它们都订阅了Topic-a。此时有一条消息发往Topic-a,那么这两个消费组都能接收到这条消息。
然后这条消息实际是写入Topic某个分区中,消费组中的某个消费者对应消费一个分区的消息。
在物理上除了副本拷贝之外,一条消息在Broker中只会有一份,每个消费组会有自己的offset即消费点位来标识消费到的位置。在消费点位之前的消息表明已经消费过了。当然这个offset是队列级别的。每个消费组都会维护订阅的Topic下的每个队列的offset。
5. Producer(生产者)
生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
6. Leader and Follower
每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。

1.Kafka 的分区策略有哪些?

所谓分区策略就是决定⽣产者将消息发送到哪个分区的算法。

  1. 轮询策略:默认的分区策略,⾮常优秀的负载均衡表现,它总是能保证消息最⼤限度地被平均分配到所有分区上;
  2. 随机策略:实现随机策略版的 partition ⽅法;
  3. 按消息键保序策略:也称 Key-Ordering 策略,可以保证同⼀个 Key 的所有消息都进⼊到相同的分区⾥,由于每个分区下的消息处理是有顺序的,所以称之为消息键保序策略;

2.消息队列中如何保证数据消息不丢失?

消息丢失是下游系统没收到上游系统发送的消息,造成系统间数据不一致。比如,订单系统没有把成功状态的订单消息成功发送到消息队列里,造成下游的统计系统没有收到下单成功订单的消息,于是造成系统间数据的不一致,从而引起用户查看个人订单列表时跟实际不相符的问题。首先分析消息队列的流程,消息丢失的情况分别可能发生在生产端,Kafka服务端,消费端。

(1)生产端需要保证不少生产消息
  • 使用带有回调方法的 API 时,我们可以根据回调函数得知消息是否发送成功,如果发送失败了,我们要进行异常处理,比如把失败消息存储到本地硬盘或远程数据库,等应用正常了再发送,这样才能保证消息不丢失。
  • 设置参数 acks=-1。acks 这个参数是指有多少分区副本收到消息后,生产者才认为消息发送成功了,可选的参数值有 0、1 和 -1。 acks=0,表示生产者不等待任何服务器节点的响应,只要发送消息就认为成功。 acks=1,表示生产者收到 leader 分区的响应就认为发送成功。 acks=-1,表示只有当 ISR中的副本全部收到消息时,生产者才会认为消息生产成功了。这种配置是最安全的,因为如果 leader 副本挂了,当 follower 副本被选为 leader 副本时,消息也不会丢失。但是系统吞吐量会降低,因为生产者要等待所有副本都收到消息后才能再次发送消息。
  • 第三个,设置参数 retries=3。参数 retries 表示生产者生产消息的重试次数。这里 retries=3 是一个建议值,一般情况下能满足足够的重试次数就能重试成功。但是如果重试失败了,对异常处理时就可以把消息保存到其他可靠的地方,如磁盘、数据库、远程缓存等,然后等到服务正常了再继续发送消息。
  • 第四个,设置参数 retry.backoff.ms=300。retry.backoff.ms 指消息生产超时或失败后重试的间隔时间,单位是毫秒。如果重试时间太短,会出现系统还没恢复就开始重试的情况,进而导致再次失败。结合我个人经验来说,300 毫秒还是比较合适的。 只要上面这四个要点配置对了,就可以保证生产端的生产者不少生产消息了。
(2)服务端保证不丢消息
  • 第一个,设置 replication.factor >1。replication.factor 这个参数表示分区副本的个数,这里我们要将其设置为大于 1 的数,这样当 leader 副本挂了,follower 副本还能被选为 leader 副本继续接收消息。
  • 第二个,设置 min.insync.replicas >1。min.insync.replicas 指的是 ISR 最少的副本数量,原理同上,也需要大于 1 的副本数量来保证消息不丢失。 这里我简单介绍下 ISR。ISR 是一个分区副本的集合,每个分区都有自己的一个 ISR 集合。但不是所有的副本都会在这个集合里,首先 leader 副本是在 ISR 集合里的,如果一个 follower 副本的消息没落后 leader 副本太长时间,这个 follower 副本也在 ISR 集合里;可是如果有一个 follower 副本落后 leader 副本太长时间,就会从 ISR 集合里被淘汰出去。也就是说,ISR 里的副本数量是小于或等于分区的副本数量的。
  • 第三个,设置 unclean.leader.election.enable = false。unclean.leader.election.enable 指是否能把非 ISR 集合中的副本选举为 leader 副本。unclean.leader.election.enable = true,也就是说允许非 ISR 集合中的 follower 副本成为 leader 副本。如果设置成这样会有什么问题呢?假设 ISR 集合内的 follower1 副本和 ISR 集合外的 follower2 副本向 leader 副本拉取消息(如下图 1),也就是说这时 ISR 集合中就有两个副本,一个是 leader 副本,另一个是 follower1 副本,而 follower2 副本由于网络或自身机器的原因已经落后 leader 副本很长时间,已经被踢出 ISR 集合。

    突然 leader 和 follower1 这两个副本挂了,由于 unclean.leader.election.enable = true,而现在分区的副本能正常工作的仅仅剩下 follower2 副本,所以 follower2 最终会被选为新的 leader 副本并继续接收生产者发送的消息,我们可以看到它接收了一个新的消息 5。

    如果这时 follower1 副本的服务恢复,又会发生什么情况呢?由于 follower 副本要拉取 leader 副本同步数据,首先要获取 leader 副本的信息,并感知到现在的 leader 副本的 LEO 比自己的还小,于是做了截断操作,这时 4 这个消息就丢了,这就造成了消息的丢失。

    因此,我们一定要把 unclean.leader.election.enable 设置为 false,只有这样非 ISR 集合的副本才不会被选为分区的 leader 副本。但是这样做也降低了可用性,因为这个分区的副本没有 leader,就无法收发消息了,但是消息会发送到别的分区 leader 副本,也就是说分区的数量实际上减少了。

    (3)消费端不能少消费消息
  • 消费者消费消息是有两个步骤的,首先拉取消息,然后再处理消息。向服务端提交消息偏移量可以手动提交也可以自动提交。如果把参数 enable.auto.commit 设置为 true 就表示消息偏移量是由消费端自动提交,由异步线程去完成的,业务线程无法控制。如果刚拉取了消息之后,业务处理还没进行完,这时提交了消息偏移量但是消费者却挂了,这就造成还没进行完业务处理的消息的位移被提交了,下次再消费就消费不到这些消息,造成消息的丢失。因此,只要关闭自动提交offset,在处理完消费的业务逻辑后手动提交offset,就可以保证数据不丢失。

3.如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?

要保证消息不被重复复消费,其实就是要保证消息消费时的幂等性。幂等性:⽆论你重复请求多少次,得到的结果都是⼀样的。例如:⼀条数据重复出现两次,数据库⾥就只有⼀条数据,这就保证了系统的幂等性。那么如何保证幂等性呢?

  • 消费端写数据时,先根据主键查⼀下这条数据是否存在,如果已经存在则 update;
  • 数据库的唯⼀键约束也可以保证不会重复插⼊多条,因为重复插⼊多条只会报错,不会导致数据库中出现脏数据;
  • 消费端v如果是写 redis,就没有问题,因为 set 操作是天然幂等性的。

4.如何保证消息的顺序性?

  • ⼀个 Topic,⼀个 Partition,⼀个 Consumer,内部单线程消费,单线程吞吐量太低,⼀般不会⽤这个。
  • 写 N 个内存 Queue,具有相同 key 的数据都到同⼀个内存 Queue;然后对于 N 个线程,每个线程分别消费⼀个内存 Queue 即可,这样就能保证顺序性。
作者:程序员LaoChen原文地址:https://segmentfault.com/a/1190000041774339

%s 个评论

要回复文章请先登录注册