kafkakafka批量消费消息端多长时间拉取一次消息

博客分类:
大家都很关心kafka消息阻塞的情况(感谢RoctetMQ给我们的教训)。Kafka上线也有一段时间了,确实有出现过消息阻塞的情况,虽然不影响业务而且用临时办法解决了,但是我觉得可以跟大家总结一下。为了不引起大家的恐慌,我决定先把结论写出来:comsumer 非正常的rebalancing(重新分配分区)才会导致消费阻塞,如果不出现rebalancing,消息是不是重复消费或阻塞。
以下是这两个BUG的描述,这可能需要一些Kafka的知识,我会说得通俗一点,同也会留下一些参考给有兴趣的童鞋进一步了解。
1. 消费者处理过慢可能会导致重复消费
线上场景:BPM会订阅消费然后把流程信息一条一条索引到ElasticSearch,当索引处理较慢(30s)的时候会出现。
重现步骤:/richarde6ee5bad4e9d56 ,很普通的代码,消费代码段加上sleep(30s)。
产生原因:Kafka是使用poll()(长轮询)拉取消息,流程可以简单理解为: 拉取消息-&向kafka broker发送心跳-&提交 offset。当消费者处理过慢(session timeout为30s)没有向kafka broker发送心跳,而且没有提交 offset,broker就会发起rebalancing,这个分区就会分配给其他消费者重复消费。最坏的情况是一直在rebalancing,新的消息都不会被消费。
临时办法:排查发现线上业务线正常场景没有消费处理过慢的场景,processor组件维护了一个线程池,每条消息都用一个线程处理。tasker-center会不断地把消息放在java的ArrayBlockingQueue。其他周边后端服务(如存储)这种处理能力难以评估的,可以尝试一下先把消息缓存到本地队列再做批量插入的操作,其实很多日志类或大数据类使用Kafka都是这样做的,如Flume。
如果确实有这样的场景,请联系我,可以通过通过两个参数减小这问题发生,但都不是完美解决问题。1) 增加session time的时间,但同时也会增加客户端失败的时间。2)减小分区拉取值(max.partition.fetch.bytes默认为1M), 但会影响吞量,而且以bytes为单位也无法评估消息的数量。
修复计划:Kafka社区专门针对这个问题写了篇WIKI https://cwiki.apache.org/confluence/pages/viewpage.action?pageId= 。这是他们的改进计划,内容有目的(Motivation), 计划修改(Proposed Change), 新增或修改的公共接口(New or Changed Public Interfaces),升级计划和兼容性(Migration Plan and Compatibility)和已放弃方案(Rejected Alternatives)。@听风,可以参考一下他的文档模板,用于轩辕组件的改进计划模板挺好的。这个BUG暂定为随着Kafka版本的升级来修复。
2. 多分区多consumer时rebalancing可能会导致某个分区阻塞。
线上场景:发生在在cart-processor,每个topic有18个分区,每个cart-processor有两个consumer(不同groupId),8个cart-processor节点。当cart-processor发版(节点增加或删除)会引起rebalancing,这可能导致个topic的分区阻塞。
重现步骤:代码/richard2011/d92caaa4af,创建18个分区的topic, 通过不断地增加或删除,同时通过kafka-consumer-groups.sh命令查看分区情况,直到出现分区阻塞。
产生原因:kafka client的BUG( https://issues.apache.org/jira/browse/KAFKA-2978 ),Kafka github主分支已修复,但是还没有release版,越多分区和consumer越容易出现这个问题。
临时办法:每次使用kafka的程序发版时,用kafka-consumer-groups.sh命令查看分区情况,发现分区阻塞则重启对应的机器。同时也写了小工具,使comsumer再次rebalancing。
修复计划:观察线上出现的频率,如果频繁出现,将会修复kafka client代码,出现不频繁则随Kafka升级修复。
浏览: 75853 次
来自: 北京
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'Kafka概述—消息队列 -
- CSDN博客
Kafka概述—消息队列
消息队列的特点:
生产者消费者模式
先进先出(FIFO)顺序保证
可靠性保证:
- 自己不丢数据(Kafka默认是7天)
- 消费者不丢数据:&至少一次,严格一次&
至少一次就是可能会有两次,会重
严格一次机制就是会负责一点
消息队列常见场景:
系统之间解耦合
- queue模型
- publish-subscribe模型
峰值压力缓冲
Kafka是一个高吞吐的分布式消息系统
Apache kafka is publish-subscribe messaging rethought as a distributed commit log.
Kafka的架构
producer: 消息的生产者
consumer: 消息消费者
broker: kafka集群的server,负责处理消息读、写请求,存储消息
topic: 消息队列/分类
架构里面有些元信息是存在Zookeeper上,整个集群的管理也和Zookeeper有很大的关系
Kafka的消息存储和生产消费模型
一个topic分成多个partition,每个partition内部消息强有序,其中的每个消息都有一个序号叫offset。
一个partition只对应一个broker,一个broker可以管多个partition。
消息不经过内存缓冲,直接写入文件。
根据时间策略删除,而不是消费完就删除。
producer自己决定往哪个partition写消息,可以是轮询的负载均衡或者是基于hash的partition策略。
Kafka里面的消息是由topic来组织的,可以想象为一个队列,一个队列就是一个topic,然后它把每个topic又分为多个partition,这个是为了做并行,在每个partition里面是有序的,相当于有序的队列,其中每个消息都有序号offset,从前往后写。
一个partition对应一个broker,一个broker可以管多个partition,比如,topic有6个partition,有两个broker,那每个broker就管3个partition。
这个partition可以很简单想象为一个文件,当数据发过来的时候它就往这个partition上面append,追加就行,kafka和很多消息系统不一样,很多消息系统是消费完了我就把它删掉,而kafka是根据时间策略删除,而不是消费完就删除,在kafka里面没有一个消费完这么个概念,只有过期这样一个概念,这个模型带来了很多个好处。
consumer自己维护消费到哪个offset,每个consumer都有对应的group,group内是queue消费模型,各个consumer消费不同的partition,因此一个消息在group内只消费一次。group间是publish-subscribe消费模型,各个group各自独立消费,互不影响,因此一个消息只被每个group消费一次。
Kafka的特点:
1.消息系统的特点:生存者消费者模型,FIFO
消息系统基本的特点是保证了,有基本的生产者消费者模型,partition内部是FIFO的,partition之间呢不是FIFO的,当然我们可以把topic设为一个partition,这样就是严格的FIFO。
2.高性能:单节点支持上千个客户端,百MB/s吞吐
接近网卡的极限。
3.持久性:消息直接持久化在普通磁盘上且性能好
直接写到磁盘里面去,就是直接append到磁盘里面去,这样的好处是直接持久话,数据不会丢,第二个好处是顺序写,然后消费数据也是顺序的读,所以持久化的同时还能保证顺序,比较好,因为磁盘顺序读比较好。
4.分布式:数据副本冗余、流量负载均衡、可扩展
分布式,数据副本,也就是同一份数据可以到不同的broker上面去,也就是当一份数据,磁盘坏掉的时候,数据不会丢失,比如3个副本,就是在3个机器磁盘都坏掉的情况下数据才会丢,在大量使用情况下看这样是非常好的,负载均衡,可扩展,在线扩展,不需要停服务的。
5.很灵活:消息长时间持久化+Client维护消费状态
有人可能会说kafka写磁盘,会不会是瓶颈,其实不会而且是非常好的,为什么是非常好的,因为kafka写磁盘是顺序的,所以不断的往前产生,不断的往后写,kafka还用了sendFile的0拷贝技术,提高速度,而且还用到了批量读写,一批批往里写,64K为单位,100K为单位,每一次网络传输量不会特别小,RTT(RTT:Round-TripTime往返时间)的开销就会微不足道,对文件的操作不会是很小的IO,也会是比较大块的IO。
从WIKI的定义中,“零拷贝”是指计算机操作的过程中,CPU不需要为数据在内存之间的拷贝消耗资源。而它通常是指计算机在网络上发送文件时,不需要将文件内容拷贝到用户空间(User Space)而直接在内核空间(Kernel Space)中传输到网络的方式。
我的热门文章kafka知识点:consumer消费消息_达内昆明Java培训
kafka知识点:consumer消费消息
昆明Java培训的老师这一期给大家讲consumer消费消息。
6.1 consumer API
kafka提供了两套consumer API:
high-level Consumer API
SimpleConsumer API
其中high-level consumer API提供了一个从kafka消费数据的高层抽象,而SimpleConsumer API则需要开发人员更多地关注细节。
high-level consumer API
high-level
consumer API提供了consumer group的语义,一个消息只能被group内的一个consumer所消费,且consumer消费消息时不关注offset,最后一个offset由zookeeper保存。
使用high-level
consumer API可以是多线程的应用,应当注意:
1.如果消费线程大于patition数量,则有些线程将收不到消息
2.如果patition数量大于线程数,则有些线程多收到多个patition的消息
3.如果一个线程消费多个patition,则无法保证你收到的消息的顺序,而一个patition内的消息是有序的
SimpleConsumer API
如果你想要对patition有更多的控制权,那就应该使用SimpleConsumer API,比如:
1.多次读取一个消息
2.只消费一个patition中的部分消息
3.使用事务来保证一个消息仅被消费一次
但是使用此API时,partition、offset、broker、leader等对你不再透明,需要自己去管理。你需要做大量的额外工作:
1.必须在应用程序中跟踪offset,从而确定下一条应该消费哪条消息
2.应用程序需要通过程序获知每个Partition的leader是谁
3.需要处理leader的变更
使用SimpleConsumer API的一般流程如下:
1.查找到一个“活着”的broker,并且找出每个partition的leader
2.找出每个partition的follower
3.定义好请求,该请求应该能描述应用程序需要哪些数据
4. fetch数据
5.识别leader的变化,并对之作出必要的响应
以下针对high-level
Consumer API进行说明。
6.2 consumer
如2.2节所说,kafka的分配单位是patition。每个consumer都属于一个group,一个partition只能被同一个group内的一个consumer所消费(也就保障了一个消息只能被group内的一个consuemr所消费),但是多个group可以同时消费这个partition。
kafka的设计目标之一就是同时实现离线处理和实时处理,根据这一特性,可以使用spark/Storm这些实时处理系统对消息在线处理,同时使用Hadoop批处理系统进行离线处理,还可以将数据备份到另一个数据中心,只需要保证这三者属于不同的consumer group。
6.3消费方式
consumer采用pull模式从broker中读取数据。
push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
6.4 consumer
delivery guarantee
如果将consumer设置为autocommit,consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka确保了Exactly once。
但实际使用中应用程序并非在consumer读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了consumer delivery guarantee:
1.读完消息先commit再处理消息。
这种模式下,如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once
2.读完消息先处理再commit。
这种模式下,如果在处理完消息之后commit之前consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于At least once。
3.如果一定要做到Exactly once,就需要协调offset和实际操作的输出。
精典的做法是引入两阶段提交。如果能让offset和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once。(目前就high-level API而言,offset是存于Zookeeper中的,无法存于HDFS,而SimpleConsuemr API的offset是由自己去维护的,可以将之存于HDFS中)
总之,Kafka默认保证At least
once,并且允许通过设置producer异步提交来实现At most once。而Exactly once要求与外部存储系统协作,幸运的是kafka提供的offset可以非常直接非常容易得使用这种方式。
6.5 consumer
当有consumer加入或退出、以及partition的改变(如broker加入或退出)时会触发rebalance。consumer rebalance算法如下:
1.将目标topic下的所有partirtion排序,存于PT
2.对某consumer
group下所有consumer排序,存于CG,第i个consumer记为Ci
N=size(PT)/size(CG),向上取整
4.解除Ci对原来分配的partition的消费权(i从0开始)
5.将第i*N到(i+1)*N-1个partition分配给Ci
在0.8.*版本,每个consumer都只负责调整自己所消费的partition,为了保证整个consumer group的一致性,当一个consumer触发了rebalance时,该consumer group内的其它所有其它consumer也应该同时触发rebalance。这会导致以下几个问题:
1.Herd effect
任何broker或者consumer的增减都会触发所有的consumer的rebalance
2.Split Brain
每个consumer分别单独通过zookeeper判断哪些broker和consumer宕机了,那么不同consumer在同一时刻从zookeeper看到的view就可能不一样,这是由zookeeper的特性决定的,这就会造成不正确的reblance尝试。
3.调整结果不可控
所有的consumer都并不知道其它consumer的rebalance是否成功,这可能会导致kafka工作在一个不正确的状态。
基于以上问题,kafka设计者考虑在0.9.*版本开始使用中心coordinator来控制consumer rebalance,然后又从简便性和验证要求两方面考虑,计划在consumer客户端实现分配方案。
了解详情请登陆昆明达内Java培训官网(km.)!
相信自己有巨大的潜能,人在关键时刻的进步是惊人的;心态决定着你的成败,努力去寻找你最近的不良心态,并努力去改变,用积极的心态促使你考试成功。今年就考这些题,最后一个月就做这些题,平均提分50多分。
今天昆明Java培训小编为各位小伙伴带来的是12个java的一些基本知识,这些小知识可能在日常工作中被忽略,希望对各位小伙伴能有所帮助。
今天昆明Java培训小编为大家介绍一下,在java中如何在使用虚拟机的时候,高效利用其内存。
经过一年意识到以前也有很多认识误区,比如:
偏爱收集,经常收集各种资料视频塞满一个个硬盘,然后心满意足的看着容量不行动。
Copyright (C)
All Rights Reserved
选择城市和中心
达内北京亦庄大学生实训基地
达内北京网络营销中心
达内北京会计中心
了解更多干货Access denied |
used Cloudflare to restrict access
Please enable cookies.
What happened?
The owner of this website () has banned your access based on your browser's signature (3b7d1da997a728a6-ua98).kafka 明明能看到消息数据,为何就是收不到信息?
要评论问题请先或
- 这个代码有点难
赞同来自: 、
要参与问题请先或
- Do all in command line
要参与问题请先或
要参与问题请先或
浏览: 9670
关注: 3 人

我要回帖

更多关于 kafka消费后删除消息 的文章

 

随机推荐