本网站(662p.com)打包出售,且带程序代码数据,662p.com域名,程序内核采用TP框架开发,需要联系扣扣:2360248666 /wx:lianweikj
精品域名一口价出售:1y1m.com(350元) ,6b7b.com(400元) , 5k5j.com(380元) , yayj.com(1800元), jiongzhun.com(1000元) , niuzen.com(2800元) , zennei.com(5000元)
需要联系扣扣:2360248666 /wx:lianweikj
kafka-consumer端核心参数解析
奔跑的男人 · 116浏览 · 发布于2023-06-30 +关注

kafka-consumer端 主要和partition(分区)以及offset相关;

1.partition的基础知识

partition一共有3种类型的身份 leader follower ISR( in-sync replicas )

follower副本的介绍

Kafka的Topic分区本质是一个用于存储Topic下的消息的日志,但是只存一份日志会因为机器损坏或其他原因导致消息丢失不可恢复,因此需要多个相同的日志作为备份,提高系统可用性,这些备份在kafka中被称为副本(replica)。

kafka将分区的所有副本均匀的分配到所有broker上,并从这些副本中选取一个作为leader副本对外提供读写服务,其他副本则被称为follower,只能被动的向leader副本请求数据以此保持和leader副本的状态同步。

ISR同步副本集合

在生产环境下,因为各种不可抗因素,服务可能会发生宕机,例如对外提供服务的leader副本,如果其发生宕机不可用,将会影响系统的使用,因此在leader副本发生宕机时,follower副本就发生作用了,kafka将从follower副本中选取一个作为新的leader副本对外提供服务;

当然,并不是所有的follower副本都有资格成为leader,因为有些follower副本可能因为各种原因,此时保存的数据落后于之前的leader,如果数据落后的follower成为了leader,将会引发消息的丢失因此kafka引入了ISR的概念。

ISR,全称 in-sync replicas,是一组动态维护的同步副本集合,每个topic分区都有自己的ISR列表,ISR中的所有副本都与leader保持同步状态(也包括leader本身),只有ISR中的副本才有资格被选为新的leader,Producer发送消息时,消息只有被全部写到了ISR中,才会被视为已提交状态,若分区ISR中有N个副本,那么该分区ISR最多可以忍受 N-1 个副本崩溃而不丢失消息。

kafka额外对追加了 isr 的概念,相当于指定了参加选举的broker;

如果一个follower副本落后leader的时间持续性的超过了replica.lag.time.max.ms(默认10秒)参数值,那么该follower副本则被认定为不能同步,并将被踢出isr;

2.partition中的offset

Last Committed Offset:consumer group 最新一次 commit 的 offset,表示这个 group 已经把 Last Committed Offset 之前的数据都消费成功了。

Current Position:consumer group 当前消费数据的 offset,也就是说,Last Committed Offset 到 Current Position 之间的数据已经拉取成功,可能正在处理,但是还未commit。

leo:记录底层日志 (log) 中的下一条消息的 offset。对 producer 来说,就是即将插入下一条消息的offset。

HW:高水位线,已经成功备份到 ISR 中的最新一条数据的 offset

HW俗称高水位,HighWatermark的缩写,取一个partition对应的ISR中最小的LEO(log-end-offset)作为HW, consumer最多只能消费到HW所在的位置。另外每个replica都有HW,leader和follower各自负责更新自己的HW的状 态。对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW, 此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。对于来自内部broker的读取请求,没有HW的限制。

kafka将leo,hw保存于recovery-point-offset-checkpoint, replication-checkpoint 两个文件中;

3.在consumer中和offset相关的配置

enable.auto.commit

If true the consumer's offset will be periodically committed in the background.

自动提交,则offset 会根据 auto.commit.interval.ms (默认5000)定时刷新;

默认,为了减少消息丢失,我们不会设置自动提交;只要当消费端成功处理消息后,我们才仍为此消息被成功消费!

请结合 Last Committed Offset 和 Current Position 思考

auto.offset.reset

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  • earliest: automatically reset the offset to the earliest offset

earliest 对于同一个消费者组,若没有提交过offset,则从头开始消费

  • latest: automatically reset the offset to the latest offset

latest 对于同一个消费者组,若没有提交过offset,则只消费消费组连接topic后,新产生的数据

  • none: throw exception to the consumer if no previous offset is found for the consumer's group

  • anything else: throw exception to the consumer.

无论何种策略,对于同一个消费者组,若已有提交的offset,则从已提交的offset继续消费

4.rebalance机制

kafka保证同一消费组中的每个consumer能够消费一个或者多个特定的partition数据,一个partition的数据只能被一个consumer消费;因为每个partition里的消息是有序的,这样可以保证partition中的数据被同一个消费者有序消费;同时consumer只需要和自己消费的partition的broker通信就可以,减少开销。

我们应尽可能保证消费者是大于等于partition的;

partition触发reblance的条件:

  • l 条件1:加入了新的consumer

  • l 条件2:consumer退出

  • l 条件3:coordinator故障,集群选举出新的coordinator

  • l 条件4:topic的partition增加

  • l 条件5:consumer调用unsubscrible(),取消topic的订阅

基于zk的rebalance

在kafka0.9版本之前,consumer的rebalance是通过在zookeeper上注册watch完成的。

这种做法很容易带来zk的羊群效应,任何Broker或者Consumer的增减都会触发所有的Consumer的Rebalance,造成集群内大量的调整;同时由于每个consumer单独通过zookeeper判断Broker和consumer宕机,由于zk的脑裂特性,同一时刻不同consumer通过zk看到的表现可能是不一样,这就可能会造成很多不正确的rebalance尝试;除此之外,由于consumer彼此独立,每个consumer都不知道其他consumer是否rebalance成功,可能会导致consumer-group消费不正确。

集群中的羊群效应

在分布式系统Zokeeper集群中,例如某一节点A被大量client进行watch时,当节点A发生变化只对一个客户端有影响;但是由于所有客户端都对该节点进行了watch,导致其他没有影响的client也会受到通知,这种不必要的通知就是分布式中的羊群效应。

Coordinator(协调者)

基于zk的rebalance存在不可避免的羊群效应和脑裂问题,kafka0.9.*的版本重新设计了consumer端,诞生了一个高可用中心Coordinator;进而减少了zookeeper大量的负载。

对于每一个Consumer-Group,Kafka集群为其从broker集群中选择一个broker作为其coordinator。

coordinator主要做两件事:

  • 维持group的成员组成。这包括加入新的成员,检测成员的存活性,清除不再存活的成员。

  • 协调group成员的行为。

Coordinator有如下几种类型:

  • GroupCoordinator:broker端的,每个kafka server都有一个实例,管理部分的consumer group和它们的offset

  • WorkerCoordinator:broker端的,管理GroupCoordinator程序,主要管理workers的分配。

  • ConsumerCoordinator:consumer端的,和GroupCoordinator通信的媒介。

ConsumerCoordinator是KafkaConsumer的一个成员,只负责与GroupCoordinator通信,所以真正的协调者还是GroupCoordinator。

5.消费者如何从kafka中拉取消息

consumer和broker的交互流程

步骤1-FIND_COORDINATOR

consumer-group中的每个consumer启动时会向kafka集群中的某个节点发送 FindCoordinatorRequest 请求来查找对应的组协调器GroupCoordinator,并跟其建立网络连接。

组协调器GroupCoordinator:每个consumer-group都会选择一个broker作为自己的组协调coordinator,负责监控消费组里的所有消费者的心跳,以及判断是否宕机,然后开启消费者rebalance。

组协调器GroupCoordinator选择方式

公式:hash(consumer-group.id) % __consumer_offsets主题的分区数

当消费组决定好协调器后,就开始和其建立心跳;发送heart-beat请求

__consumer_offsets 是记录位点的主体;其默认有50个分区,当然在kafka中只要存在分区,就会存在副本!

步骤2-JoinGroup

步骤3-SyncGroup

JoinGroup返回之后,发送SyncGroup,得到自己所分配到的partition

Rebalance Generation

它表示了rebalance之后的一届成员,主要是用于保护consumer group,隔离无效offset提交的。比如上一届的consumer成员是无法提交位移到新一届的consumer group中。每次group进行rebalance之后,generation号都会加1,表示group进入到了一个新的版本,如下图所示: Generation 1时group有3个成员,随后成员2退出组,coordinator触发rebalance,consumer group进入Generation 2,之后成员4加入,再次触发rebalance,group进入Generation 3.

消费组核心参数

kafka.apache.org/documentati…

spring:
  kafka:
    consumer:
      bootstrap-servers: ${kafka-broker.ips}
      group-id: auto-dev #消费者组
      # earliest:无提交记录,从头开始消费
      # latest:无提交记录,从最新的消息的下一条开始消费
      auto-offset-reset: earliest 
      enable-auto-commit: false #是否自动提交偏移量offset
      auto-commit-interval: 1S #前提是 enable-auto-commit=true。自动提交的频率
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 2
      properties:
        # 如果在这个时间内没有收到心跳,该消费者会被踢出组并触发rebalance
        session.timeout.ms: 120000
        # 最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),
        # 服务端也会认为该消费者失效。踢出并再平衡
        max.poll.interval.ms: 300000
        # 配置控制客户端等待请求响应的最长时间。 
        # 如果在超时之前没有收到响应,客户端将在必要时重新发送请求,
        # 或者如果重试次数用尽,则请求失败。
        request.timeout.ms: 60000
        #订阅或分配主题时,允许自动创建主题。0.11之前,必须设置false
        allow.auto.create.topics: true
        # poll方法向协调器发送心跳的频率,为session.timeout.ms的三分之一
        heartbeat.interval.ms: 40000 
        # 每个分区里返回的记录最多不超max.partitions.fetch.bytes 指定的字节
        # 0.10.1版本后 如果 fetch 的第一个非空分区中的第一条消息大于这个限制
        # 仍然会返回该消息,以确保消费者可以进行
        #max.partition.fetch.bytes=1048576  #1M
    listener:
      # 当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效
      # manual_immediate:手动调用Acknowledgment.acknowledge()后立即提交
      ack-mode: manual_immediate
      missing-topics-fatal: true #如果至少有一个topic不存在,true启动失败。false忽略
      # type: single 单条消费
      # type: batch 批量消费
      # 批量消费需要配合 consumer.max-poll-records
      type: batch
      # 配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲
      concurrency: 2


相关推荐

PHP实现部分字符隐藏

沙雕mars · 1325浏览 · 2019-04-28 09:47:56
Java中ArrayList和LinkedList区别

kenrry1992 · 908浏览 · 2019-05-08 21:14:54
Tomcat 下载及安装配置

manongba · 970浏览 · 2019-05-13 21:03:56
JAVA变量介绍

manongba · 963浏览 · 2019-05-13 21:05:52
什么是SpringBoot

iamitnan · 1086浏览 · 2019-05-14 22:20:36
加载中

0评论

评论
分类专栏
小鸟云服务器
扫码进入手机网页